Fixes in CA for vendor update
This commit is contained in:
parent
07870d830c
commit
70ef92a12a
|
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||
package utils
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
|
@ -90,14 +91,14 @@ func WriteStatusConfigMap(kubeClient kube_client.Interface, namespace string, ms
|
|||
var getStatusError, writeStatusError error
|
||||
var errMsg string
|
||||
maps := kubeClient.CoreV1().ConfigMaps(namespace)
|
||||
configMap, getStatusError = maps.Get(StatusConfigMapName, metav1.GetOptions{})
|
||||
configMap, getStatusError = maps.Get(context.TODO(), StatusConfigMapName, metav1.GetOptions{})
|
||||
if getStatusError == nil {
|
||||
configMap.Data["status"] = statusMsg
|
||||
if configMap.ObjectMeta.Annotations == nil {
|
||||
configMap.ObjectMeta.Annotations = make(map[string]string)
|
||||
}
|
||||
configMap.ObjectMeta.Annotations[ConfigMapLastUpdatedKey] = statusUpdateTime
|
||||
configMap, writeStatusError = maps.Update(configMap)
|
||||
configMap, writeStatusError = maps.Update(context.TODO(), configMap, metav1.UpdateOptions{})
|
||||
} else if kube_errors.IsNotFound(getStatusError) {
|
||||
configMap = &apiv1.ConfigMap{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
|
|
@ -111,7 +112,7 @@ func WriteStatusConfigMap(kubeClient kube_client.Interface, namespace string, ms
|
|||
"status": statusMsg,
|
||||
},
|
||||
}
|
||||
configMap, writeStatusError = maps.Create(configMap)
|
||||
configMap, writeStatusError = maps.Create(context.TODO(), configMap, metav1.CreateOptions{})
|
||||
} else {
|
||||
errMsg = fmt.Sprintf("Failed to retrieve status configmap for update: %v", getStatusError)
|
||||
}
|
||||
|
|
@ -134,7 +135,7 @@ func WriteStatusConfigMap(kubeClient kube_client.Interface, namespace string, ms
|
|||
// DeleteStatusConfigMap deletes status configmap
|
||||
func DeleteStatusConfigMap(kubeClient kube_client.Interface, namespace string) error {
|
||||
maps := kubeClient.CoreV1().ConfigMaps(namespace)
|
||||
err := maps.Delete(StatusConfigMapName, &metav1.DeleteOptions{})
|
||||
err := maps.Delete(context.TODO(), StatusConfigMapName, &metav1.DeleteOptions{})
|
||||
if err != nil {
|
||||
klog.Error("Failed to delete status configmap")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||
package core
|
||||
|
||||
import (
|
||||
ctx "context"
|
||||
"fmt"
|
||||
"math"
|
||||
"reflect"
|
||||
|
|
@ -1230,7 +1231,7 @@ func drainNode(node *apiv1.Node, pods []*apiv1.Pod, client kube_client.Interface
|
|||
for start := time.Now(); time.Now().Sub(start) < time.Duration(maxGracefulTerminationSec)*time.Second+podEvictionHeadroom; time.Sleep(5 * time.Second) {
|
||||
allGone = true
|
||||
for _, pod := range pods {
|
||||
podreturned, err := client.CoreV1().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{})
|
||||
podreturned, err := client.CoreV1().Pods(pod.Namespace).Get(ctx.TODO(), pod.Name, metav1.GetOptions{})
|
||||
if err == nil && (podreturned == nil || podreturned.Spec.NodeName == node.Name) {
|
||||
klog.Errorf("Not deleted yet %s/%s", pod.Namespace, pod.Name)
|
||||
allGone = false
|
||||
|
|
@ -1250,7 +1251,7 @@ func drainNode(node *apiv1.Node, pods []*apiv1.Pod, client kube_client.Interface
|
|||
}
|
||||
|
||||
for _, pod := range pods {
|
||||
podReturned, err := client.CoreV1().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{})
|
||||
podReturned, err := client.CoreV1().Pods(pod.Namespace).Get(ctx.TODO(), pod.Name, metav1.GetOptions{})
|
||||
if err == nil && (podReturned == nil || podReturned.Spec.NodeName == node.Name) {
|
||||
evictionResults[pod.Name] = status.PodEvictionResult{Pod: pod, TimedOut: true, Err: nil}
|
||||
} else if err != nil && !kube_errors.IsNotFound(err) {
|
||||
|
|
|
|||
|
|
@ -17,13 +17,15 @@ limitations under the License.
|
|||
package core
|
||||
|
||||
import (
|
||||
ctx "context"
|
||||
"fmt"
|
||||
"k8s.io/autoscaler/cluster-autoscaler/simulator"
|
||||
autoscaler_errors "k8s.io/autoscaler/cluster-autoscaler/utils/errors"
|
||||
"sort"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/autoscaler/cluster-autoscaler/simulator"
|
||||
autoscaler_errors "k8s.io/autoscaler/cluster-autoscaler/utils/errors"
|
||||
|
||||
batchv1 "k8s.io/api/batch/v1"
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
policyv1 "k8s.io/api/policy/v1beta1"
|
||||
|
|
@ -1582,7 +1584,7 @@ func TestCheckScaleDownDeltaWithinLimits(t *testing.T) {
|
|||
|
||||
func getNode(t *testing.T, client kube_client.Interface, name string) *apiv1.Node {
|
||||
t.Helper()
|
||||
node, err := client.CoreV1().Nodes().Get(name, metav1.GetOptions{})
|
||||
node, err := client.CoreV1().Nodes().Get(ctx.TODO(), name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to retrieve node %v: %v", name, err)
|
||||
}
|
||||
|
|
@ -1596,7 +1598,7 @@ func hasDeletionCandidateTaint(t *testing.T, client kube_client.Interface, name
|
|||
|
||||
func getAllNodes(t *testing.T, client kube_client.Interface) []*apiv1.Node {
|
||||
t.Helper()
|
||||
nodeList, err := client.CoreV1().Nodes().List(metav1.ListOptions{})
|
||||
nodeList, err := client.CoreV1().Nodes().List(ctx.TODO(), metav1.ListOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to retrieve list of nodes: %v", err)
|
||||
}
|
||||
|
|
@ -1641,9 +1643,9 @@ func TestSoftTaint(t *testing.T) {
|
|||
p1200.Spec.NodeName = "n2000"
|
||||
|
||||
fakeClient := fake.NewSimpleClientset()
|
||||
_, err = fakeClient.CoreV1().Nodes().Create(n1000)
|
||||
_, err = fakeClient.CoreV1().Nodes().Create(ctx.TODO(), n1000, metav1.CreateOptions{})
|
||||
assert.NoError(t, err)
|
||||
_, err = fakeClient.CoreV1().Nodes().Create(n2000)
|
||||
_, err = fakeClient.CoreV1().Nodes().Create(ctx.TODO(), n2000, metav1.CreateOptions{})
|
||||
assert.NoError(t, err)
|
||||
|
||||
provider := testprovider.NewTestCloudProvider(nil, func(nodeGroup string, node string) error {
|
||||
|
|
@ -1757,9 +1759,9 @@ func TestSoftTaintTimeLimit(t *testing.T) {
|
|||
}()
|
||||
|
||||
fakeClient := fake.NewSimpleClientset()
|
||||
_, err := fakeClient.CoreV1().Nodes().Create(n1)
|
||||
_, err := fakeClient.CoreV1().Nodes().Create(ctx.TODO(), n1, metav1.CreateOptions{})
|
||||
assert.NoError(t, err)
|
||||
_, err = fakeClient.CoreV1().Nodes().Create(n2)
|
||||
_, err = fakeClient.CoreV1().Nodes().Create(ctx.TODO(), n2, metav1.CreateOptions{})
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Move time forward when updating
|
||||
|
|
|
|||
|
|
@ -390,7 +390,7 @@ func main() {
|
|||
kubeClient := createKubeClient(getKubeConfig())
|
||||
|
||||
// Validate that the client is ok.
|
||||
_, err = kubeClient.CoreV1().Nodes().List(metav1.ListOptions{})
|
||||
_, err = kubeClient.CoreV1().Nodes().List(ctx.TODO(), metav1.ListOptions{})
|
||||
if err != nil {
|
||||
klog.Fatalf("Failed to get nodes from apiserver: %v", err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -32,6 +32,7 @@ import (
|
|||
scheduler_listers "k8s.io/kubernetes/pkg/scheduler/listers"
|
||||
scheduler_nodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
|
||||
scheduler_volumebinder "k8s.io/kubernetes/pkg/scheduler/volumebinder"
|
||||
|
||||
// We need to import provider to initialize default scheduler.
|
||||
"k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
|
||||
)
|
||||
|
|
@ -49,8 +50,8 @@ type SchedulerBasedPredicateChecker struct {
|
|||
// NewSchedulerBasedPredicateChecker builds scheduler based PredicateChecker.
|
||||
func NewSchedulerBasedPredicateChecker(kubeClient kube_client.Interface, stop <-chan struct{}) (*SchedulerBasedPredicateChecker, error) {
|
||||
informerFactory := informers.NewSharedInformerFactory(kubeClient, 0)
|
||||
providerRegistry := algorithmprovider.NewRegistry(1) // 1 here is hardPodAffinityWeight not relevant for CA
|
||||
config := providerRegistry[scheduler_apis_config.SchedulerDefaultProviderName]
|
||||
providerRegistry := algorithmprovider.NewRegistry()
|
||||
plugins := providerRegistry[scheduler_apis_config.SchedulerDefaultProviderName]
|
||||
sharedLister := NewDelegatingSchedulerSharedLister()
|
||||
|
||||
volumeBinder := scheduler_volumebinder.NewVolumeBinder(
|
||||
|
|
@ -65,8 +66,8 @@ func NewSchedulerBasedPredicateChecker(kubeClient kube_client.Interface, stop <-
|
|||
|
||||
framework, err := scheduler_framework.NewFramework(
|
||||
scheduler_plugins.NewInTreeRegistry(),
|
||||
config.FrameworkPlugins,
|
||||
config.FrameworkPluginConfig,
|
||||
plugins,
|
||||
nil, // This is fine.
|
||||
scheduler_framework.WithInformerFactory(informerFactory),
|
||||
scheduler_framework.WithSnapshotSharedLister(sharedLister),
|
||||
scheduler_framework.WithVolumeBinder(volumeBinder),
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||
package deletetaint
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
|
@ -73,7 +74,7 @@ func addTaint(node *apiv1.Node, client kube_client.Interface, taintKey string, e
|
|||
for {
|
||||
if refresh {
|
||||
// Get the newest version of the node.
|
||||
freshNode, err = client.CoreV1().Nodes().Get(node.Name, metav1.GetOptions{})
|
||||
freshNode, err = client.CoreV1().Nodes().Get(context.TODO(), node.Name, metav1.GetOptions{})
|
||||
if err != nil || freshNode == nil {
|
||||
klog.Warningf("Error while adding %v taint on node %v: %v", getKeyShortName(taintKey), node.Name, err)
|
||||
return fmt.Errorf("failed to get node %v: %v", node.Name, err)
|
||||
|
|
@ -88,7 +89,7 @@ func addTaint(node *apiv1.Node, client kube_client.Interface, taintKey string, e
|
|||
}
|
||||
return nil
|
||||
}
|
||||
_, err = client.CoreV1().Nodes().Update(freshNode)
|
||||
_, err = client.CoreV1().Nodes().Update(context.TODO(), freshNode, metav1.UpdateOptions{})
|
||||
if err != nil && errors.IsConflict(err) && time.Now().Before(retryDeadline) {
|
||||
refresh = true
|
||||
time.Sleep(conflictRetryInterval)
|
||||
|
|
@ -180,7 +181,7 @@ func cleanTaint(node *apiv1.Node, client kube_client.Interface, taintKey string)
|
|||
for {
|
||||
if refresh {
|
||||
// Get the newest version of the node.
|
||||
freshNode, err = client.CoreV1().Nodes().Get(node.Name, metav1.GetOptions{})
|
||||
freshNode, err = client.CoreV1().Nodes().Get(context.TODO(), node.Name, metav1.GetOptions{})
|
||||
if err != nil || freshNode == nil {
|
||||
klog.Warningf("Error while adding %v taint on node %v: %v", getKeyShortName(taintKey), node.Name, err)
|
||||
return false, fmt.Errorf("failed to get node %v: %v", node.Name, err)
|
||||
|
|
@ -204,7 +205,7 @@ func cleanTaint(node *apiv1.Node, client kube_client.Interface, taintKey string)
|
|||
}
|
||||
|
||||
freshNode.Spec.Taints = newTaints
|
||||
_, err = client.CoreV1().Nodes().Update(freshNode)
|
||||
_, err = client.CoreV1().Nodes().Update(context.TODO(), freshNode, metav1.UpdateOptions{})
|
||||
|
||||
if err != nil && errors.IsConflict(err) && time.Now().Before(retryDeadline) {
|
||||
refresh = true
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||
package deletetaint
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"sync/atomic"
|
||||
|
|
@ -191,7 +192,7 @@ func setConflictRetryInterval(interval time.Duration) time.Duration {
|
|||
|
||||
func getNode(t *testing.T, client kube_client.Interface, name string) *apiv1.Node {
|
||||
t.Helper()
|
||||
node, err := client.CoreV1().Nodes().Get(name, metav1.GetOptions{})
|
||||
node, err := client.CoreV1().Nodes().Get(context.TODO(), name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to retrieve node %v: %v", name, err)
|
||||
}
|
||||
|
|
@ -203,7 +204,7 @@ func buildFakeClient(t *testing.T, nodes ...*apiv1.Node) *fake.Clientset {
|
|||
fakeClient := fake.NewSimpleClientset()
|
||||
|
||||
for _, node := range nodes {
|
||||
_, err := fakeClient.CoreV1().Nodes().Create(node)
|
||||
_, err := fakeClient.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{})
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue