Merge pull request #3177 from elmiko/issue/3104

Fix stale replicas issue with cluster-autoscaler CAPI provider
This commit is contained in:
Kubernetes Prow Robot 2020-06-03 00:40:17 -07:00 committed by GitHub
commit 1ae89b93b0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 344 additions and 68 deletions

View File

@ -21,6 +21,7 @@ import (
"fmt"
"os"
"strings"
"sync"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -65,6 +66,7 @@ type machineController struct {
machineSetResource *schema.GroupVersionResource
machineResource *schema.GroupVersionResource
machineDeploymentResource *schema.GroupVersionResource
accessLock sync.Mutex
}
type machineSetFilterFunc func(machineSet *MachineSet) error

View File

@ -24,6 +24,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/klog"
"k8s.io/utils/pointer"
)
@ -75,8 +76,22 @@ func (r machineDeploymentScalableResource) Nodes() ([]string, error) {
return result, nil
}
func (r machineDeploymentScalableResource) Replicas() int32 {
return pointer.Int32PtrDerefOr(r.machineDeployment.Spec.Replicas, 0)
func (r machineDeploymentScalableResource) Replicas() (int32, error) {
freshMachineDeployment, err := r.controller.getMachineDeployment(r.machineDeployment.Namespace, r.machineDeployment.Name, metav1.GetOptions{})
if err != nil {
return 0, err
}
if freshMachineDeployment == nil {
return 0, fmt.Errorf("unknown machineDeployment %s", r.machineDeployment.Name)
}
if freshMachineDeployment.Spec.Replicas == nil {
klog.Warningf("MachineDeployment %q has nil spec.replicas. This is unsupported behaviour. Falling back to status.replicas.", r.machineDeployment.Name)
}
// If no value for replicas on the MachineSet spec, fallback to the status
// TODO: Remove this fallback once defaulting is implemented for MachineSet Replicas
return pointer.Int32PtrDerefOr(freshMachineDeployment.Spec.Replicas, freshMachineDeployment.Status.Replicas), nil
}
func (r machineDeploymentScalableResource) SetSize(nreplicas int32) error {

View File

@ -24,6 +24,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/klog"
"k8s.io/utils/pointer"
)
@ -60,8 +61,23 @@ func (r machineSetScalableResource) Nodes() ([]string, error) {
return r.controller.machineSetProviderIDs(r.machineSet)
}
func (r machineSetScalableResource) Replicas() int32 {
return pointer.Int32PtrDerefOr(r.machineSet.Spec.Replicas, 0)
func (r machineSetScalableResource) Replicas() (int32, error) {
freshMachineSet, err := r.controller.getMachineSet(r.machineSet.Namespace, r.machineSet.Name, metav1.GetOptions{})
if err != nil {
return 0, err
}
if freshMachineSet == nil {
return 0, fmt.Errorf("unknown machineSet %s", r.machineSet.Name)
}
if freshMachineSet.Spec.Replicas == nil {
klog.Warningf("MachineSet %q has nil spec.replicas. This is unsupported behaviour. Falling back to status.replicas.", r.machineSet.Name)
}
// If no value for replicas on the MachineSet spec, fallback to the status
// TODO: Remove this fallback once defaulting is implemented for MachineSet Replicas
return pointer.Int32PtrDerefOr(freshMachineSet.Spec.Replicas, freshMachineSet.Status.Replicas), nil
}
func (r machineSetScalableResource) SetSize(nreplicas int32) error {

View File

@ -0,0 +1,150 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package clusterapi
import (
"context"
"testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
)
func TestSetSize(t *testing.T) {
initialReplicas := int32(1)
updatedReplicas := int32(5)
testConfig := createMachineSetTestConfig(testNamespace, int(initialReplicas), nil)
controller, stop := mustCreateTestController(t, testConfig)
defer stop()
sr, err := newMachineSetScalableResource(controller, testConfig.machineSet)
if err != nil {
t.Fatal(err)
}
err = sr.SetSize(updatedReplicas)
if err != nil {
t.Fatal(err)
}
// fetch machineSet
u, err := sr.controller.dynamicclient.Resource(*sr.controller.machineSetResource).Namespace(testConfig.machineSet.Namespace).
Get(context.TODO(), testConfig.machineSet.Name, metav1.GetOptions{})
if err != nil {
t.Fatal(err)
}
replicas, found, err := unstructured.NestedInt64(u.Object, "spec", "replicas")
if err != nil {
t.Fatal(err)
}
if !found {
t.Fatal("spec.replicas not found")
}
got := int32(replicas)
if got != updatedReplicas {
t.Errorf("expected %v, got: %v", updatedReplicas, got)
}
}
func TestReplicas(t *testing.T) {
initialReplicas := int32(1)
updatedReplicas := int32(5)
testConfig := createMachineSetTestConfig(testNamespace, int(initialReplicas), nil)
controller, stop := mustCreateTestController(t, testConfig)
defer stop()
sr, err := newMachineSetScalableResource(controller, testConfig.machineSet)
if err != nil {
t.Fatal(err)
}
i, err := sr.Replicas()
if err != nil {
t.Fatal(err)
}
if i != initialReplicas {
t.Errorf("expected %v, got: %v", initialReplicas, i)
}
// fetch and update machineSet
u, err := sr.controller.dynamicclient.Resource(*sr.controller.machineSetResource).Namespace(testConfig.machineSet.Namespace).
Get(context.TODO(), testConfig.machineSet.Name, metav1.GetOptions{})
if err != nil {
t.Fatal(err)
}
if err := unstructured.SetNestedField(u.Object, int64(updatedReplicas), "spec", "replicas"); err != nil {
t.Fatal(err)
}
_, err = sr.controller.dynamicclient.Resource(*sr.controller.machineSetResource).Namespace(u.GetNamespace()).
Update(context.TODO(), u, metav1.UpdateOptions{})
if err != nil {
t.Fatal(err)
}
i, err = sr.Replicas()
if err != nil {
t.Fatal(err)
}
if i != updatedReplicas {
t.Errorf("expected %v, got: %v", updatedReplicas, i)
}
}
func TestSetSizeAndReplicas(t *testing.T) {
initialReplicas := int32(1)
updatedReplicas := int32(5)
testConfig := createMachineSetTestConfig(testNamespace, int(initialReplicas), nil)
controller, stop := mustCreateTestController(t, testConfig)
defer stop()
sr, err := newMachineSetScalableResource(controller, testConfig.machineSet)
if err != nil {
t.Fatal(err)
}
i, err := sr.Replicas()
if err != nil {
t.Fatal(err)
}
if i != initialReplicas {
t.Errorf("expected %v, got: %v", initialReplicas, i)
}
err = sr.SetSize(updatedReplicas)
if err != nil {
t.Fatal(err)
}
i, err = sr.Replicas()
if err != nil {
t.Fatal(err)
}
if i != updatedReplicas {
t.Errorf("expected %v, got: %v", updatedReplicas, i)
}
}

View File

@ -59,7 +59,11 @@ func (ng *nodegroup) MaxSize() int {
// (new nodes finish startup and registration or removed nodes are
// deleted completely). Implementation required.
func (ng *nodegroup) TargetSize() (int, error) {
return int(ng.scalableResource.Replicas()), nil
size, err := ng.scalableResource.Replicas()
if err != nil {
return 0, err
}
return int(size), nil
}
// IncreaseSize increases the size of the node group. To delete a node
@ -70,11 +74,17 @@ func (ng *nodegroup) IncreaseSize(delta int) error {
if delta <= 0 {
return fmt.Errorf("size increase must be positive")
}
size := int(ng.scalableResource.Replicas())
if size+delta > ng.MaxSize() {
return fmt.Errorf("size increase too large - desired:%d max:%d", size+delta, ng.MaxSize())
size, err := ng.scalableResource.Replicas()
if err != nil {
return err
}
return ng.scalableResource.SetSize(int32(size + delta))
intSize := int(size)
if intSize+delta > ng.MaxSize() {
return fmt.Errorf("size increase too large - desired:%d max:%d", intSize+delta, ng.MaxSize())
}
return ng.scalableResource.SetSize(int32(intSize + delta))
}
// DeleteNodes deletes nodes from this node group. Error is returned
@ -82,6 +92,19 @@ func (ng *nodegroup) IncreaseSize(delta int) error {
// group. This function should wait until node group size is updated.
// Implementation required.
func (ng *nodegroup) DeleteNodes(nodes []*corev1.Node) error {
ng.machineController.accessLock.Lock()
defer ng.machineController.accessLock.Unlock()
replicas, err := ng.scalableResource.Replicas()
if err != nil {
return err
}
// if we are at minSize already we wail early.
if int(replicas) <= ng.MinSize() {
return fmt.Errorf("min size reached, nodes will not be deleted")
}
// Step 1: Verify all nodes belong to this node group.
for _, node := range nodes {
actualNodeGroup, err := ng.machineController.nodeGroupForNode(node)
@ -99,12 +122,10 @@ func (ng *nodegroup) DeleteNodes(nodes []*corev1.Node) error {
}
// Step 2: if deleting len(nodes) would make the replica count
// <= 0, then the request to delete that many nodes is bogus
// < minSize, then the request to delete that many nodes is bogus
// and we fail fast.
replicas := ng.scalableResource.Replicas()
if replicas-int32(len(nodes)) <= 0 {
return fmt.Errorf("unable to delete %d machines in %q, machine replicas are <= 0 ", len(nodes), ng.Id())
if replicas-int32(len(nodes)) < int32(ng.MinSize()) {
return fmt.Errorf("unable to delete %d machines in %q, machine replicas are %q, minSize is %q ", len(nodes), ng.Id(), replicas, ng.MinSize())
}
// Step 3: annotate the corresponding machine that it is a
@ -184,7 +205,11 @@ func (ng *nodegroup) Id() string {
// Debug returns a string containing all information regarding this node group.
func (ng *nodegroup) Debug() string {
return fmt.Sprintf(debugFormat, ng.Id(), ng.MinSize(), ng.MaxSize(), ng.scalableResource.Replicas())
replicas, err := ng.scalableResource.Replicas()
if err != nil {
return fmt.Sprintf("%s (min: %d, max: %d, replicas: %v)", ng.Id(), ng.MinSize(), ng.MaxSize(), err)
}
return fmt.Sprintf(debugFormat, ng.Id(), ng.MinSize(), ng.MaxSize(), replicas)
}
// Nodes returns a list of all nodes that belong to this node group.

View File

@ -17,6 +17,7 @@ limitations under the License.
package clusterapi
import (
"context"
"fmt"
"path"
"sort"
@ -24,7 +25,10 @@ import (
"testing"
"time"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/utils/pointer"
)
@ -108,7 +112,7 @@ func TestNodeGroupNewNodeGroupConstructor(t *testing.T) {
}
test := func(t *testing.T, tc testCase, testConfig *testConfig) {
controller, stop := mustCreateTestController(t)
controller, stop := mustCreateTestController(t, testConfig)
defer stop()
ng, err := newNodeGroup(t, controller, testConfig)
@ -429,12 +433,22 @@ func TestNodeGroupDecreaseTargetSize(t *testing.T) {
switch v := (ng.scalableResource).(type) {
case *machineSetScalableResource:
testConfig.machineSet.Spec.Replicas = int32ptr(*testConfig.machineSet.Spec.Replicas + tc.targetSizeIncrement)
if err := controller.machineSetInformer.Informer().GetStore().Add(newUnstructuredFromMachineSet(testConfig.machineSet)); err != nil {
u := newUnstructuredFromMachineSet(testConfig.machineSet)
if err := controller.machineSetInformer.Informer().GetStore().Add(u); err != nil {
t.Fatalf("failed to add new machine: %v", err)
}
_, err := controller.dynamicclient.Resource(*controller.machineSetResource).Namespace(u.GetNamespace()).Update(context.TODO(), u, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("failed to updating machine: %v", err)
}
case *machineDeploymentScalableResource:
testConfig.machineDeployment.Spec.Replicas = int32ptr(*testConfig.machineDeployment.Spec.Replicas + tc.targetSizeIncrement)
if err := controller.machineDeploymentInformer.Informer().GetStore().Add(newUnstructuredFromMachineDeployment(testConfig.machineDeployment)); err != nil {
u := newUnstructuredFromMachineDeployment(testConfig.machineDeployment)
if err := controller.machineDeploymentInformer.Informer().GetStore().Add(u); err != nil {
}
_, err := controller.dynamicclient.Resource(*controller.machineDeploymentResource).Namespace(u.GetNamespace()).Update(context.TODO(), u, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("failed to updating machine: %v", err)
}
default:
t.Errorf("unexpected type: %T", v)
@ -450,6 +464,7 @@ func TestNodeGroupDecreaseTargetSize(t *testing.T) {
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if currReplicas != int(tc.initial)+int(tc.targetSizeIncrement) {
t.Errorf("initially expected %v, got %v", tc.initial, currReplicas)
}
@ -806,16 +821,29 @@ func TestNodeGroupMachineSetDeleteNodesWithMismatchedNodes(t *testing.T) {
}
func TestNodeGroupDeleteNodesTwice(t *testing.T) {
addDeletionTimestamp := func(t *testing.T, controller *machineController, machine *Machine) error {
addDeletionTimestampToMachine := func(t *testing.T, controller *machineController, node *corev1.Node) error {
m, err := controller.findMachineByProviderID(normalizedProviderString(node.Spec.ProviderID))
if err != nil {
return err
}
// Simulate delete that would have happened if the
// Machine API controllers were running Don't actually
// delete since the fake client does not support
// finalizers.
now := v1.Now()
machine.DeletionTimestamp = &now
return controller.machineInformer.Informer().GetStore().Update(newUnstructuredFromMachine(machine))
m.DeletionTimestamp = &now
if _, err := controller.dynamicclient.Resource(*controller.machineResource).Namespace(m.GetNamespace()).Update(context.Background(), newUnstructuredFromMachine(m), v1.UpdateOptions{}); err != nil {
return err
}
return nil
}
// This is the size we expect the NodeGroup to be after we have called DeleteNodes.
// We need at least 8 nodes for this test to be valid.
expectedSize := 7
test := func(t *testing.T, testConfig *testConfig) {
controller, stop := mustCreateTestController(t, testConfig)
defer stop()
@ -835,6 +863,17 @@ func TestNodeGroupDeleteNodesTwice(t *testing.T) {
t.Fatalf("unexpected error: %v", err)
}
// Check that the test case is valid before executing DeleteNodes
// 1. We must have at least 1 more node than the expected size otherwise DeleteNodes is a no-op
// 2. MinSize must be less than the expected size otherwise a second call to DeleteNodes may
// not make the nodegroup size less than the expected size.
if len(nodeNames) <= expectedSize {
t.Fatalf("expected more nodes than the expected size: %d <= %d", len(nodeNames), expectedSize)
}
if ng.MinSize() >= expectedSize {
t.Fatalf("expected min size to be less than expected size: %d >= %d", ng.MinSize(), expectedSize)
}
if len(nodeNames) != len(testConfig.nodes) {
t.Fatalf("expected len=%v, got len=%v", len(testConfig.nodes), len(nodeNames))
}
@ -849,55 +888,41 @@ func TestNodeGroupDeleteNodesTwice(t *testing.T) {
}
}
// These are the nodes which are over the final expectedSize
nodesToBeDeleted := testConfig.nodes[expectedSize:]
// Assert that we have no DeletionTimestamp
for i := 7; i < len(testConfig.machines); i++ {
for i := expectedSize; i < len(testConfig.machines); i++ {
if !testConfig.machines[i].ObjectMeta.DeletionTimestamp.IsZero() {
t.Fatalf("unexpected DeletionTimestamp")
}
}
if err := ng.DeleteNodes(testConfig.nodes[7:]); err != nil {
// Delete all nodes over the expectedSize
if err := ng.DeleteNodes(nodesToBeDeleted); err != nil {
t.Fatalf("unexpected error: %v", err)
}
for i := 7; i < len(testConfig.machines); i++ {
if err := addDeletionTimestamp(t, controller, testConfig.machines[i]); err != nil {
for _, node := range nodesToBeDeleted {
if err := addDeletionTimestampToMachine(t, controller, node); err != nil {
t.Fatalf("unexpected err: %v", err)
}
if testConfig.machines[i].ObjectMeta.DeletionTimestamp.IsZero() {
t.Fatalf("expected a DeletionTimestamp")
}
}
// TODO(frobware) We have a flaky test here because we
// just called Delete and Update and the next call to
// controller.nodeGroups() will sometimes get stale
// objects from the (fakeclient) store. To fix this we
// should update the test machinery so that individual
// tests can have callbacks on Add/Update/Delete on
// each of the respective informers. We should then
// override those callbacks here in this test to add
// rendezvous points so that we wait until all objects
// have been updated before we go and get them again.
//
// Running this test with a 500ms duration I see:
//
// $ ./stress ./openshiftmachineapi.test -test.run TestNodeGroupDeleteNodesTwice -test.count 5 | ts | ts -i
// 00:00:05 Feb 27 14:29:36 0 runs so far, 0 failures
// 00:00:05 Feb 27 14:29:41 8 runs so far, 0 failures
// 00:00:05 Feb 27 14:29:46 16 runs so far, 0 failures
// 00:00:05 Feb 27 14:29:51 24 runs so far, 0 failures
// 00:00:05 Feb 27 14:29:56 32 runs so far, 0 failures
// ...
// 00:00:05 Feb 27 14:31:01 112 runs so far, 0 failures
// 00:00:05 Feb 27 14:31:06 120 runs so far, 0 failures
// 00:00:05 Feb 27 14:31:11 128 runs so far, 0 failures
// 00:00:05 Feb 27 14:31:16 136 runs so far, 0 failures
// 00:00:05 Feb 27 14:31:21 144 runs so far, 0 failures
//
// To make sure we don't run into any flakes in CI
// I've chosen to make this sleep duration 3s.
time.Sleep(3 * time.Second)
// Wait for the machineset to have been updated
if err := wait.PollImmediate(100*time.Millisecond, 5*time.Second, func() (bool, error) {
nodegroups, err = controller.nodeGroups()
if err != nil {
return false, err
}
targetSize, err := nodegroups[0].TargetSize()
if err != nil {
return false, err
}
return targetSize == expectedSize, nil
}); err != nil {
t.Fatalf("unexpected error waiting for nodegroup to be expected size: %v", err)
}
nodegroups, err = controller.nodeGroups()
if err != nil {
@ -906,22 +931,58 @@ func TestNodeGroupDeleteNodesTwice(t *testing.T) {
ng = nodegroups[0]
// Attempt to delete the nodes again which verifies
// that nodegroup.DeleteNodes() skips over nodes that
// have a non-nil DeletionTimestamp value.
if err := ng.DeleteNodes(testConfig.nodes[7:]); err != nil {
t.Fatalf("unexpected error: %v", err)
}
// Check the nodegroup is at the expected size
actualSize, err := ng.TargetSize()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
expectedSize := len(testConfig.machines) - len(testConfig.machines[7:])
if actualSize != expectedSize {
t.Fatalf("expected %d nodes, got %d", expectedSize, actualSize)
}
// Check that the machines deleted in the last run have DeletionTimestamp's
// when fetched from the API
for _, node := range nodesToBeDeleted {
// Ensure the update has propogated
if err := wait.PollImmediate(100*time.Millisecond, 5*time.Second, func() (bool, error) {
m, err := controller.findMachineByProviderID(normalizedProviderString(node.Spec.ProviderID))
if err != nil {
return false, err
}
return !m.GetDeletionTimestamp().IsZero(), nil
}); err != nil {
t.Fatalf("unexpected error waiting for machine to have deletion timestamp: %v", err)
}
}
// Attempt to delete the nodes again which verifies
// that nodegroup.DeleteNodes() skips over nodes that
// have a non-nil DeletionTimestamp value.
if err := ng.DeleteNodes(nodesToBeDeleted); err != nil {
t.Fatalf("unexpected error: %v", err)
}
switch v := (ng.scalableResource).(type) {
case *machineSetScalableResource:
updatedMachineSet, err := controller.getMachineSet(testConfig.machineSet.Namespace, testConfig.machineSet.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if actual := pointer.Int32PtrDerefOr(updatedMachineSet.Spec.Replicas, 0); int(actual) != expectedSize {
t.Fatalf("expected %v nodes, got %v", expectedSize, actual)
}
case *machineDeploymentScalableResource:
updatedMachineDeployment, err := controller.getMachineDeployment(testConfig.machineDeployment.Namespace, testConfig.machineDeployment.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if actual := pointer.Int32PtrDerefOr(updatedMachineDeployment.Spec.Replicas, 0); int(actual) != expectedSize {
t.Fatalf("expected %v nodes, got %v", expectedSize, actual)
}
default:
t.Errorf("unexpected type: %T", v)
}
}
// Note: 10 is an upper bound for the number of nodes/replicas

View File

@ -42,7 +42,7 @@ type scalableResource interface {
SetSize(nreplicas int32) error
// Replicas returns the current replica count of the resource
Replicas() int32
Replicas() (int32, error)
// MarkMachineForDeletion marks machine for deletion
MarkMachineForDeletion(machine *Machine) error

View File

@ -36,7 +36,11 @@ type MachineDeploymentSpec struct {
}
// MachineDeploymentStatus is the internal autoscaler Schema for MachineDeploymentStatus
type MachineDeploymentStatus struct{}
type MachineDeploymentStatus struct {
// Number of desired machines. Defaults to 1.
// This is a pointer to distinguish between explicit zero and not specified.
Replicas int32 `json:"replicas,omitempty"`
}
// MachineDeployment is the internal autoscaler Schema for MachineDeployment
type MachineDeployment struct {

View File

@ -68,7 +68,10 @@ type MachineTemplateSpec struct {
}
// MachineSetStatus is the internal autoscaler Schema for MachineSetStatus
type MachineSetStatus struct{}
type MachineSetStatus struct {
// Replicas is the most recently observed number of replicas.
Replicas int32 `json:"replicas"`
}
// MachineSetList is the internal autoscaler Schema for MachineSetList
type MachineSetList struct {