mirror of https://github.com/kubernetes/kops.git
reconcile: wait for apiserver to response before trying rolling-update
The rolling-update requires the apiserver (when called without --cloudonly), so reconcile should wait for apiserver to start responding. Implement this by reusing "validate cluster", but filtering to only the instance groups and pods that we expect to be online.
This commit is contained in:
parent
2b133b2503
commit
f2d4eeb104
|
|
@ -258,7 +258,7 @@ func RunDeleteInstance(ctx context.Context, f *util.Factory, out io.Writer, opti
|
|||
|
||||
var clusterValidator validation.ClusterValidator
|
||||
if !options.CloudOnly {
|
||||
clusterValidator, err = validation.NewClusterValidator(cluster, cloud, list, restConfig, k8sClient)
|
||||
clusterValidator, err = validation.NewClusterValidator(cluster, cloud, list, nil, nil, restConfig, k8sClient)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot create cluster validator: %v", err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -20,8 +20,10 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/kops/cmd/kops/util"
|
||||
"k8s.io/kops/pkg/apis/kops"
|
||||
"k8s.io/kops/pkg/commands/commandutils"
|
||||
|
|
@ -134,6 +136,30 @@ func RunReconcileCluster(ctx context.Context, f *util.Factory, out io.Writer, c
|
|||
}
|
||||
}
|
||||
|
||||
// Particularly for a new cluster, we need to wait for the control plane to be answering requests
|
||||
// before we can do a rolling update.
|
||||
fmt.Fprintf(out, "Waiting for the kubernetes API to be served\n")
|
||||
{
|
||||
opt := &ValidateClusterOptions{}
|
||||
opt.InitDefaults()
|
||||
opt.ClusterName = c.ClusterName
|
||||
opt.wait = 10 * time.Minute
|
||||
|
||||
// filter the instance group to only include the control plane
|
||||
opt.filterInstanceGroups = func(ig *kops.InstanceGroup) bool {
|
||||
return ig.Spec.Role == kops.InstanceGroupRoleAPIServer || ig.Spec.Role == kops.InstanceGroupRoleControlPlane
|
||||
}
|
||||
|
||||
// Ignore all pods, we just want to check the control plane is responding
|
||||
opt.filterPodsForValidation = func(pod *v1.Pod) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
if _, err := RunValidateCluster(ctx, f, out, opt); err != nil {
|
||||
return fmt.Errorf("waiting for kubernetes API to be served: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
fmt.Fprintf(out, "Doing rolling-update for control plane\n")
|
||||
{
|
||||
opt := &RollingUpdateOptions{}
|
||||
|
|
|
|||
|
|
@ -453,7 +453,7 @@ func RunRollingUpdateCluster(ctx context.Context, f *util.Factory, out io.Writer
|
|||
return fmt.Errorf("getting rest config: %w", err)
|
||||
}
|
||||
|
||||
clusterValidator, err = validation.NewClusterValidator(cluster, cloud, list, restConfig, k8sClient)
|
||||
clusterValidator, err = validation.NewClusterValidator(cluster, cloud, list, nil, nil, restConfig, k8sClient)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot create cluster validator: %v", err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -36,6 +36,7 @@ import (
|
|||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kops/cmd/kops/util"
|
||||
"k8s.io/kops/pkg/apis/kops"
|
||||
kopsapi "k8s.io/kops/pkg/apis/kops"
|
||||
"k8s.io/kops/pkg/validation"
|
||||
"k8s.io/kops/util/pkg/tables"
|
||||
|
|
@ -62,11 +63,18 @@ var (
|
|||
|
||||
type ValidateClusterOptions struct {
|
||||
ClusterName string
|
||||
InstanceGroupRoles []kops.InstanceGroupRole
|
||||
output string
|
||||
wait time.Duration
|
||||
count int
|
||||
interval time.Duration
|
||||
kubeconfig string
|
||||
|
||||
// filterInstanceGroups is a function that returns true if the instance group should be validated
|
||||
filterInstanceGroups func(ig *kops.InstanceGroup) bool
|
||||
|
||||
// filterPodsForValidation is a function that returns true if the pod should be validated
|
||||
filterPodsForValidation func(pod *v1.Pod) bool
|
||||
}
|
||||
|
||||
func (o *ValidateClusterOptions) InitDefaults() {
|
||||
|
|
@ -164,7 +172,7 @@ func RunValidateCluster(ctx context.Context, f *util.Factory, out io.Writer, opt
|
|||
|
||||
timeout := time.Now().Add(options.wait)
|
||||
|
||||
validator, err := validation.NewClusterValidator(cluster, cloud, list, restConfig, k8sClient)
|
||||
validator, err := validation.NewClusterValidator(cluster, cloud, list, options.filterInstanceGroups, options.filterPodsForValidation, restConfig, k8sClient)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unexpected error creating validatior: %v", err)
|
||||
}
|
||||
|
|
@ -175,7 +183,7 @@ func RunValidateCluster(ctx context.Context, f *util.Factory, out io.Writer, opt
|
|||
return nil, fmt.Errorf("wait time exceeded during validation")
|
||||
}
|
||||
|
||||
result, err := validator.Validate()
|
||||
result, err := validator.Validate(ctx)
|
||||
if err != nil {
|
||||
consecutive = 0
|
||||
if options.wait > 0 {
|
||||
|
|
|
|||
|
|
@ -537,7 +537,7 @@ func (c *RollingUpdateCluster) validateClusterWithTimeout(validateCount int, gro
|
|||
|
||||
for {
|
||||
// Note that we validate at least once before checking the timeout, in case the cluster is healthy with a short timeout
|
||||
result, err := c.ClusterValidator.Validate()
|
||||
result, err := c.ClusterValidator.Validate(ctx)
|
||||
if err == nil && !hasFailureRelevantToGroup(result.Failures, group) {
|
||||
successCount++
|
||||
if successCount >= validateCount {
|
||||
|
|
|
|||
|
|
@ -84,13 +84,13 @@ func getTestSetup() (*RollingUpdateCluster, *awsup.MockAWSCloud) {
|
|||
|
||||
type successfulClusterValidator struct{}
|
||||
|
||||
func (*successfulClusterValidator) Validate() (*validation.ValidationCluster, error) {
|
||||
func (*successfulClusterValidator) Validate(ctx context.Context) (*validation.ValidationCluster, error) {
|
||||
return &validation.ValidationCluster{}, nil
|
||||
}
|
||||
|
||||
type failingClusterValidator struct{}
|
||||
|
||||
func (*failingClusterValidator) Validate() (*validation.ValidationCluster, error) {
|
||||
func (*failingClusterValidator) Validate(ctx context.Context) (*validation.ValidationCluster, error) {
|
||||
return &validation.ValidationCluster{
|
||||
Failures: []*validation.ValidationError{
|
||||
{
|
||||
|
|
@ -104,7 +104,7 @@ func (*failingClusterValidator) Validate() (*validation.ValidationCluster, error
|
|||
|
||||
type erroringClusterValidator struct{}
|
||||
|
||||
func (*erroringClusterValidator) Validate() (*validation.ValidationCluster, error) {
|
||||
func (*erroringClusterValidator) Validate(ctx context.Context) (*validation.ValidationCluster, error) {
|
||||
return nil, errors.New("testing validation error")
|
||||
}
|
||||
|
||||
|
|
@ -113,7 +113,7 @@ type instanceGroupNodeSpecificErrorClusterValidator struct {
|
|||
InstanceGroup *kopsapi.InstanceGroup
|
||||
}
|
||||
|
||||
func (igErrorValidator *instanceGroupNodeSpecificErrorClusterValidator) Validate() (*validation.ValidationCluster, error) {
|
||||
func (igErrorValidator *instanceGroupNodeSpecificErrorClusterValidator) Validate(ctx context.Context) (*validation.ValidationCluster, error) {
|
||||
return &validation.ValidationCluster{
|
||||
Failures: []*validation.ValidationError{
|
||||
{
|
||||
|
|
@ -130,7 +130,7 @@ type assertNotCalledClusterValidator struct {
|
|||
T *testing.T
|
||||
}
|
||||
|
||||
func (v *assertNotCalledClusterValidator) Validate() (*validation.ValidationCluster, error) {
|
||||
func (v *assertNotCalledClusterValidator) Validate(ctx context.Context) (*validation.ValidationCluster, error) {
|
||||
v.T.Fatal("validator called unexpectedly")
|
||||
return nil, errors.New("validator called unexpectedly")
|
||||
}
|
||||
|
|
@ -425,8 +425,7 @@ type failAfterOneNodeClusterValidator struct {
|
|||
ReturnError bool
|
||||
}
|
||||
|
||||
func (v *failAfterOneNodeClusterValidator) Validate() (*validation.ValidationCluster, error) {
|
||||
ctx := context.TODO()
|
||||
func (v *failAfterOneNodeClusterValidator) Validate(ctx context.Context) (*validation.ValidationCluster, error) {
|
||||
asgGroups, _ := v.Cloud.Autoscaling().DescribeAutoScalingGroups(ctx, &autoscaling.DescribeAutoScalingGroupsInput{
|
||||
AutoScalingGroupNames: []string{v.Group},
|
||||
})
|
||||
|
|
@ -648,8 +647,7 @@ type flappingClusterValidator struct {
|
|||
invocationCount int
|
||||
}
|
||||
|
||||
func (v *flappingClusterValidator) Validate() (*validation.ValidationCluster, error) {
|
||||
ctx := context.TODO()
|
||||
func (v *flappingClusterValidator) Validate(ctx context.Context) (*validation.ValidationCluster, error) {
|
||||
asgGroups, _ := v.Cloud.Autoscaling().DescribeAutoScalingGroups(ctx, &autoscaling.DescribeAutoScalingGroupsInput{
|
||||
AutoScalingGroupNames: []string{"master-1"},
|
||||
})
|
||||
|
|
@ -706,7 +704,7 @@ type failThreeTimesClusterValidator struct {
|
|||
invocationCount int
|
||||
}
|
||||
|
||||
func (v *failThreeTimesClusterValidator) Validate() (*validation.ValidationCluster, error) {
|
||||
func (v *failThreeTimesClusterValidator) Validate(ctx context.Context) (*validation.ValidationCluster, error) {
|
||||
v.invocationCount++
|
||||
if v.invocationCount <= 3 {
|
||||
return &validation.ValidationCluster{
|
||||
|
|
@ -1060,7 +1058,7 @@ type concurrentTest struct {
|
|||
detached map[string]bool
|
||||
}
|
||||
|
||||
func (c *concurrentTest) Validate() (*validation.ValidationCluster, error) {
|
||||
func (c *concurrentTest) Validate(ctx context.Context) (*validation.ValidationCluster, error) {
|
||||
c.mutex.Lock()
|
||||
defer c.mutex.Unlock()
|
||||
|
||||
|
|
@ -1441,7 +1439,7 @@ type alreadyDetachedTest struct {
|
|||
detached map[string]bool
|
||||
}
|
||||
|
||||
func (t *alreadyDetachedTest) Validate() (*validation.ValidationCluster, error) {
|
||||
func (t *alreadyDetachedTest) Validate(ctx context.Context) (*validation.ValidationCluster, error) {
|
||||
t.mutex.Lock()
|
||||
defer t.mutex.Unlock()
|
||||
|
||||
|
|
|
|||
|
|
@ -75,7 +75,7 @@ type countingValidator struct {
|
|||
numValidations int
|
||||
}
|
||||
|
||||
func (c *countingValidator) Validate() (*validation.ValidationCluster, error) {
|
||||
func (c *countingValidator) Validate(ctx context.Context) (*validation.ValidationCluster, error) {
|
||||
c.numValidations++
|
||||
return &validation.ValidationCluster{}, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -56,15 +56,23 @@ type ValidationError struct {
|
|||
|
||||
type ClusterValidator interface {
|
||||
// Validate validates a k8s cluster
|
||||
Validate() (*ValidationCluster, error)
|
||||
Validate(ctx context.Context) (*ValidationCluster, error)
|
||||
}
|
||||
|
||||
type clusterValidatorImpl struct {
|
||||
cluster *kops.Cluster
|
||||
cloud fi.Cloud
|
||||
instanceGroups []*kops.InstanceGroup
|
||||
restConfig *rest.Config
|
||||
k8sClient kubernetes.Interface
|
||||
|
||||
// allInstanceGroups is the list of all instance groups in the cluster
|
||||
allInstanceGroups []*kops.InstanceGroup
|
||||
|
||||
// filterInstanceGroups is a function that returns true if the instance group should be validated
|
||||
filterInstanceGroups func(ig *kops.InstanceGroup) bool
|
||||
|
||||
// filterPodsForValidation is a function that returns true if the pod should be validated
|
||||
filterPodsForValidation func(pod *v1.Pod) bool
|
||||
}
|
||||
|
||||
func (v *ValidationCluster) addError(failure *ValidationError) {
|
||||
|
|
@ -101,30 +109,44 @@ func hasPlaceHolderIP(host string) (string, error) {
|
|||
return "", nil
|
||||
}
|
||||
|
||||
func NewClusterValidator(cluster *kops.Cluster, cloud fi.Cloud, instanceGroupList *kops.InstanceGroupList, restConfig *rest.Config, k8sClient kubernetes.Interface) (ClusterValidator, error) {
|
||||
var instanceGroups []*kops.InstanceGroup
|
||||
func NewClusterValidator(cluster *kops.Cluster, cloud fi.Cloud, instanceGroupList *kops.InstanceGroupList, filterInstanceGroups func(ig *kops.InstanceGroup) bool, filterPodsForValidation func(pod *v1.Pod) bool, restConfig *rest.Config, k8sClient kubernetes.Interface) (ClusterValidator, error) {
|
||||
var allInstanceGroups []*kops.InstanceGroup
|
||||
|
||||
for i := range instanceGroupList.Items {
|
||||
ig := &instanceGroupList.Items[i]
|
||||
instanceGroups = append(instanceGroups, ig)
|
||||
allInstanceGroups = append(allInstanceGroups, ig)
|
||||
}
|
||||
|
||||
if len(instanceGroups) == 0 {
|
||||
if len(allInstanceGroups) == 0 {
|
||||
return nil, fmt.Errorf("no InstanceGroup objects found")
|
||||
}
|
||||
|
||||
// If no filter is provided, validate all instance groups
|
||||
if filterInstanceGroups == nil {
|
||||
filterInstanceGroups = func(ig *kops.InstanceGroup) bool {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
// If no filter is provided, validate all pods
|
||||
if filterPodsForValidation == nil {
|
||||
filterPodsForValidation = func(pod *v1.Pod) bool {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return &clusterValidatorImpl{
|
||||
cluster: cluster,
|
||||
cloud: cloud,
|
||||
instanceGroups: instanceGroups,
|
||||
allInstanceGroups: allInstanceGroups,
|
||||
restConfig: restConfig,
|
||||
k8sClient: k8sClient,
|
||||
filterInstanceGroups: filterInstanceGroups,
|
||||
filterPodsForValidation: filterPodsForValidation,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (v *clusterValidatorImpl) Validate() (*ValidationCluster, error) {
|
||||
ctx := context.TODO()
|
||||
|
||||
func (v *clusterValidatorImpl) Validate(ctx context.Context) (*ValidationCluster, error) {
|
||||
validation := &ValidationCluster{}
|
||||
|
||||
// Do not use if we are running gossip or without dns
|
||||
|
|
@ -161,13 +183,14 @@ func (v *clusterValidatorImpl) Validate() (*ValidationCluster, error) {
|
|||
}
|
||||
|
||||
warnUnmatched := false
|
||||
cloudGroups, err := v.cloud.GetCloudGroups(v.cluster, v.instanceGroups, warnUnmatched, nodeList.Items)
|
||||
cloudGroups, err := v.cloud.GetCloudGroups(v.cluster, v.allInstanceGroups, warnUnmatched, nodeList.Items)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
readyNodes, nodeInstanceGroupMapping := validation.validateNodes(cloudGroups, v.instanceGroups)
|
||||
|
||||
if err := validation.collectPodFailures(ctx, v.k8sClient, readyNodes, nodeInstanceGroupMapping); err != nil {
|
||||
readyNodes, nodeInstanceGroupMapping := validation.validateNodes(cloudGroups, v.allInstanceGroups, v.filterInstanceGroups)
|
||||
|
||||
if err := validation.collectPodFailures(ctx, v.k8sClient, readyNodes, nodeInstanceGroupMapping, v.filterPodsForValidation); err != nil {
|
||||
return nil, fmt.Errorf("cannot get pod health for %q: %v", v.cluster.Name, err)
|
||||
}
|
||||
|
||||
|
|
@ -181,7 +204,7 @@ var masterStaticPods = []string{
|
|||
}
|
||||
|
||||
func (v *ValidationCluster) collectPodFailures(ctx context.Context, client kubernetes.Interface, nodes []v1.Node,
|
||||
nodeInstanceGroupMapping map[string]*kops.InstanceGroup,
|
||||
nodeInstanceGroupMapping map[string]*kops.InstanceGroup, podValidationFilter func(pod *v1.Pod) bool,
|
||||
) error {
|
||||
masterWithoutPod := map[string]map[string]bool{}
|
||||
nodeByAddress := map[string]string{}
|
||||
|
|
@ -210,10 +233,16 @@ func (v *ValidationCluster) collectPodFailures(ctx context.Context, client kuber
|
|||
delete(masterWithoutPod[nodeByAddress[pod.Status.HostIP]], app)
|
||||
}
|
||||
|
||||
// Ignore pods that we don't want to validate
|
||||
if !podValidationFilter(pod) {
|
||||
return nil
|
||||
}
|
||||
|
||||
priority := pod.Spec.PriorityClassName
|
||||
if priority != "system-cluster-critical" && priority != "system-node-critical" {
|
||||
return nil
|
||||
}
|
||||
|
||||
if pod.Status.Phase == v1.PodSucceeded {
|
||||
return nil
|
||||
}
|
||||
|
|
@ -275,12 +304,16 @@ func (v *ValidationCluster) collectPodFailures(ctx context.Context, client kuber
|
|||
return nil
|
||||
}
|
||||
|
||||
func (v *ValidationCluster) validateNodes(cloudGroups map[string]*cloudinstances.CloudInstanceGroup, groups []*kops.InstanceGroup) ([]v1.Node, map[string]*kops.InstanceGroup) {
|
||||
func (v *ValidationCluster) validateNodes(cloudGroups map[string]*cloudinstances.CloudInstanceGroup, groups []*kops.InstanceGroup, shouldValidateInstanceGroup func(ig *kops.InstanceGroup) bool) ([]v1.Node, map[string]*kops.InstanceGroup) {
|
||||
var readyNodes []v1.Node
|
||||
groupsSeen := map[string]bool{}
|
||||
nodeInstanceGroupMapping := map[string]*kops.InstanceGroup{}
|
||||
|
||||
for _, cloudGroup := range cloudGroups {
|
||||
if cloudGroup.InstanceGroup != nil && !shouldValidateInstanceGroup(cloudGroup.InstanceGroup) {
|
||||
continue
|
||||
}
|
||||
|
||||
var allMembers []*cloudinstances.CloudInstance
|
||||
allMembers = append(allMembers, cloudGroup.Ready...)
|
||||
allMembers = append(allMembers, cloudGroup.NeedUpdate...)
|
||||
|
|
@ -372,6 +405,10 @@ func (v *ValidationCluster) validateNodes(cloudGroups map[string]*cloudinstances
|
|||
}
|
||||
|
||||
for _, ig := range groups {
|
||||
if !shouldValidateInstanceGroup(ig) {
|
||||
continue
|
||||
}
|
||||
|
||||
if !groupsSeen[ig.Name] {
|
||||
v.addError(&ValidationError{
|
||||
Kind: "InstanceGroup",
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||
package validation
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
|
|
@ -70,6 +71,8 @@ func (c *MockCloud) GetCloudGroups(cluster *kopsapi.Cluster, instancegroups []*k
|
|||
}
|
||||
|
||||
func testValidate(t *testing.T, groups map[string]*cloudinstances.CloudInstanceGroup, objects []runtime.Object) (*ValidationCluster, error) {
|
||||
ctx := context.TODO()
|
||||
|
||||
cluster := &kopsapi.Cluster{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "testcluster.k8s.local"},
|
||||
Spec: kopsapi.ClusterSpec{
|
||||
|
|
@ -130,14 +133,16 @@ func testValidate(t *testing.T, groups map[string]*cloudinstances.CloudInstanceG
|
|||
restConfig := &rest.Config{
|
||||
Host: "https://api.testcluster.k8s.local",
|
||||
}
|
||||
validator, err := NewClusterValidator(cluster, mockcloud, &kopsapi.InstanceGroupList{Items: instanceGroups}, restConfig, fake.NewSimpleClientset(objects...))
|
||||
validator, err := NewClusterValidator(cluster, mockcloud, &kopsapi.InstanceGroupList{Items: instanceGroups}, nil, nil, restConfig, fake.NewSimpleClientset(objects...))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return validator.Validate()
|
||||
return validator.Validate(ctx)
|
||||
}
|
||||
|
||||
func Test_ValidateCloudGroupMissing(t *testing.T) {
|
||||
ctx := context.TODO()
|
||||
|
||||
cluster := &kopsapi.Cluster{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "testcluster.k8s.local"},
|
||||
Spec: kopsapi.ClusterSpec{
|
||||
|
|
@ -163,9 +168,9 @@ func Test_ValidateCloudGroupMissing(t *testing.T) {
|
|||
restConfig := &rest.Config{
|
||||
Host: "https://api.testcluster.k8s.local",
|
||||
}
|
||||
validator, err := NewClusterValidator(cluster, mockcloud, &kopsapi.InstanceGroupList{Items: instanceGroups}, restConfig, fake.NewSimpleClientset())
|
||||
validator, err := NewClusterValidator(cluster, mockcloud, &kopsapi.InstanceGroupList{Items: instanceGroups}, nil, nil, restConfig, fake.NewSimpleClientset())
|
||||
require.NoError(t, err)
|
||||
v, err := validator.Validate()
|
||||
v, err := validator.Validate(ctx)
|
||||
require.NoError(t, err)
|
||||
if !assert.Len(t, v.Failures, 1) ||
|
||||
!assert.Equal(t, &ValidationError{
|
||||
|
|
|
|||
Loading…
Reference in New Issue