From b1f8f84306d54062e37bee64cc7fb45f9f227e1d Mon Sep 17 00:00:00 2001 From: Justin SB Date: Sun, 25 Aug 2019 13:24:13 -0400 Subject: [PATCH] Code changes for 1.15 --- cmd/kops/create.go | 2 +- cmd/kops/delete.go | 2 +- cmd/kops/replace.go | 2 +- cmd/kops/rollingupdatecluster.go | 2 +- .../providers/google/clouddns/BUILD.bazel | 2 +- .../providers/google/clouddns/clouddns.go | 2 +- nodeup/pkg/model/etcd_manager_tls.go | 13 +- pkg/drain/BUILD.bazel | 30 ++ pkg/drain/README.md | 7 + pkg/drain/cordon.go | 97 ++++++ pkg/drain/default.go | 69 +++++ pkg/drain/drain.go | 292 ++++++++++++++++++ pkg/drain/filters.go | 223 +++++++++++++ pkg/instancegroups/BUILD.bazel | 3 +- pkg/instancegroups/instancegroups.go | 46 +-- pkg/model/network.go | 2 +- pkg/pkiutil/BUILD.bazel | 8 + pkg/pkiutil/pki_helpers.go | 100 ++++++ protokube/pkg/protokube/volume_mounter.go | 5 +- upup/pkg/fi/cloudup/awsup/aws_cloud.go | 2 +- 20 files changed, 862 insertions(+), 47 deletions(-) create mode 100644 pkg/drain/BUILD.bazel create mode 100644 pkg/drain/README.md create mode 100644 pkg/drain/cordon.go create mode 100644 pkg/drain/default.go create mode 100644 pkg/drain/drain.go create mode 100644 pkg/drain/filters.go create mode 100644 pkg/pkiutil/BUILD.bazel create mode 100644 pkg/pkiutil/pki_helpers.go diff --git a/cmd/kops/create.go b/cmd/kops/create.go index 511dcb4bf0..fdb3a1c443 100644 --- a/cmd/kops/create.go +++ b/cmd/kops/create.go @@ -24,7 +24,7 @@ import ( "github.com/spf13/cobra" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/cli-runtime/pkg/genericclioptions/resource" + "k8s.io/cli-runtime/pkg/resource" "k8s.io/klog" "k8s.io/kops/cmd/kops/util" kopsapi "k8s.io/kops/pkg/apis/kops" diff --git a/cmd/kops/delete.go b/cmd/kops/delete.go index 21f0fef910..5eb81c77eb 100644 --- a/cmd/kops/delete.go +++ b/cmd/kops/delete.go @@ -23,7 +23,7 @@ import ( "github.com/spf13/cobra" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/cli-runtime/pkg/genericclioptions/resource" + "k8s.io/cli-runtime/pkg/resource" "k8s.io/klog" "k8s.io/kops/cmd/kops/util" kopsapi "k8s.io/kops/pkg/apis/kops" diff --git a/cmd/kops/replace.go b/cmd/kops/replace.go index d3e3b3063a..8d1308f02a 100644 --- a/cmd/kops/replace.go +++ b/cmd/kops/replace.go @@ -23,7 +23,7 @@ import ( "github.com/spf13/cobra" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/cli-runtime/pkg/genericclioptions/resource" + "k8s.io/cli-runtime/pkg/resource" "k8s.io/klog" "k8s.io/kops/cmd/kops/util" kopsapi "k8s.io/kops/pkg/apis/kops" diff --git a/cmd/kops/rollingupdatecluster.go b/cmd/kops/rollingupdatecluster.go index 9f9f48f345..17d2568ef1 100644 --- a/cmd/kops/rollingupdatecluster.go +++ b/cmd/kops/rollingupdatecluster.go @@ -229,7 +229,7 @@ func RunRollingUpdateCluster(f *util.Factory, out io.Writer, options *RollingUpd } contextName := cluster.ObjectMeta.Name - clientGetter := genericclioptions.NewConfigFlags() + clientGetter := genericclioptions.NewConfigFlags(true) clientGetter.Context = &contextName config, err := clientGetter.ToRESTConfig() diff --git a/dnsprovider/pkg/dnsprovider/providers/google/clouddns/BUILD.bazel b/dnsprovider/pkg/dnsprovider/providers/google/clouddns/BUILD.bazel index 67d01fa88a..71de7e226c 100644 --- a/dnsprovider/pkg/dnsprovider/providers/google/clouddns/BUILD.bazel +++ b/dnsprovider/pkg/dnsprovider/providers/google/clouddns/BUILD.bazel @@ -26,7 +26,7 @@ go_library( "//vendor/google.golang.org/api/dns/v1:go_default_library", "//vendor/gopkg.in/gcfg.v1:go_default_library", "//vendor/k8s.io/klog:go_default_library", - "//vendor/k8s.io/kubernetes/pkg/cloudprovider/providers/gce:go_default_library", + "//vendor/k8s.io/legacy-cloud-providers/gce:go_default_library", ], ) diff --git a/dnsprovider/pkg/dnsprovider/providers/google/clouddns/clouddns.go b/dnsprovider/pkg/dnsprovider/providers/google/clouddns/clouddns.go index 8a6fcf6795..51674c735e 100644 --- a/dnsprovider/pkg/dnsprovider/providers/google/clouddns/clouddns.go +++ b/dnsprovider/pkg/dnsprovider/providers/google/clouddns/clouddns.go @@ -31,7 +31,7 @@ import ( "k8s.io/kops/dnsprovider/pkg/dnsprovider" "k8s.io/kops/dnsprovider/pkg/dnsprovider/providers/google/clouddns/internal" "k8s.io/kops/dnsprovider/pkg/dnsprovider/providers/google/clouddns/internal/stubs" - "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" + "k8s.io/legacy-cloud-providers/gce" ) const ( diff --git a/nodeup/pkg/model/etcd_manager_tls.go b/nodeup/pkg/model/etcd_manager_tls.go index 5afc5fe30b..2a14fe021e 100644 --- a/nodeup/pkg/model/etcd_manager_tls.go +++ b/nodeup/pkg/model/etcd_manager_tls.go @@ -26,6 +26,7 @@ import ( certutil "k8s.io/client-go/util/cert" "k8s.io/klog" + "k8s.io/kops/pkg/pkiutil" "k8s.io/kops/upup/pkg/fi" ) @@ -105,7 +106,7 @@ func (b *EtcdManagerTLSBuilder) buildKubeAPIServerKeypair() error { { p := filepath.Join(dir, "etcd-ca.crt") - certBytes := certutil.EncodeCertPEM(etcdClientsCACertificate.Certificate) + certBytes := pkiutil.EncodeCertPEM(etcdClientsCACertificate.Certificate) if err := ioutil.WriteFile(p, certBytes, 0644); err != nil { return fmt.Errorf("error writing certificate key file %q: %v", p, err) } @@ -114,13 +115,13 @@ func (b *EtcdManagerTLSBuilder) buildKubeAPIServerKeypair() error { name := "etcd-client" humanName := dir + "/" + name - privateKey, err := certutil.NewPrivateKey() + privateKey, err := pkiutil.NewPrivateKey() if err != nil { return fmt.Errorf("unable to create private key %q: %v", humanName, err) } - privateKeyBytes := certutil.EncodePrivateKeyPEM(privateKey) + privateKeyBytes := pkiutil.EncodePrivateKeyPEM(privateKey) - certConfig := certutil.Config{ + certConfig := &certutil.Config{ CommonName: "kube-apiserver", Usages: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth}, } @@ -131,12 +132,12 @@ func (b *EtcdManagerTLSBuilder) buildKubeAPIServerKeypair() error { } klog.Infof("signing certificate for %q", humanName) - cert, err := certutil.NewSignedCert(certConfig, privateKey, etcdClientsCACertificate.Certificate, signingKey) + cert, err := pkiutil.NewSignedCert(certConfig, privateKey, etcdClientsCACertificate.Certificate, signingKey) if err != nil { return fmt.Errorf("error signing certificate for %q: %v", humanName, err) } - certBytes := certutil.EncodeCertPEM(cert) + certBytes := pkiutil.EncodeCertPEM(cert) p := filepath.Join(dir, name) { diff --git a/pkg/drain/BUILD.bazel b/pkg/drain/BUILD.bazel new file mode 100644 index 0000000000..273508b34c --- /dev/null +++ b/pkg/drain/BUILD.bazel @@ -0,0 +1,30 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = [ + "cordon.go", + "default.go", + "drain.go", + "filters.go", + ], + importpath = "k8s.io/kops/pkg/drain", + visibility = ["//visibility:public"], + deps = [ + "//vendor/k8s.io/api/apps/v1:go_default_library", + "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/api/policy/v1beta1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/fields:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/json:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//vendor/k8s.io/client-go/kubernetes:go_default_library", + ], +) diff --git a/pkg/drain/README.md b/pkg/drain/README.md new file mode 100644 index 0000000000..5110dd03ce --- /dev/null +++ b/pkg/drain/README.md @@ -0,0 +1,7 @@ +# Drain code + +This is the drain code copied from k8s.io/kubernetes, after the extraction in +https://github.com/kubernetes/kubernetes/pull/80045/files + +Once we are on that version of k/k (1.16), we can replace with the upstream +version. diff --git a/pkg/drain/cordon.go b/pkg/drain/cordon.go new file mode 100644 index 0000000000..fc33975266 --- /dev/null +++ b/pkg/drain/cordon.go @@ -0,0 +1,97 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package drain + +import ( + "fmt" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/json" + "k8s.io/apimachinery/pkg/util/strategicpatch" + "k8s.io/client-go/kubernetes" +) + +// CordonHelper wraps functionality to cordon/uncordon nodes +type CordonHelper struct { + node *corev1.Node + desired bool +} + +// NewCordonHelper returns a new CordonHelper +func NewCordonHelper(node *corev1.Node) *CordonHelper { + return &CordonHelper{ + node: node, + } +} + +// NewCordonHelperFromRuntimeObject returns a new CordonHelper, or an error if given object is not a +// node or cannot be encoded as JSON +func NewCordonHelperFromRuntimeObject(nodeObject runtime.Object, scheme *runtime.Scheme, gvk schema.GroupVersionKind) (*CordonHelper, error) { + nodeObject, err := scheme.ConvertToVersion(nodeObject, gvk.GroupVersion()) + if err != nil { + return nil, err + } + + node, ok := nodeObject.(*corev1.Node) + if !ok { + return nil, fmt.Errorf("unexpected type %T", nodeObject) + } + + return NewCordonHelper(node), nil +} + +// UpdateIfRequired returns true if c.node.Spec.Unschedulable isn't already set, +// or false when no change is needed +func (c *CordonHelper) UpdateIfRequired(desired bool) bool { + c.desired = desired + if c.node.Spec.Unschedulable == c.desired { + return false + } + return true +} + +// PatchOrReplace uses given clientset to update the node status, either by patching or +// updating the given node object; it may return error if the object cannot be encoded as +// JSON, or if either patch or update calls fail; it will also return a second error +// whenever creating a patch has failed +func (c *CordonHelper) PatchOrReplace(clientset kubernetes.Interface) (error, error) { + client := clientset.CoreV1().Nodes() + + oldData, err := json.Marshal(c.node) + if err != nil { + return err, nil + } + + c.node.Spec.Unschedulable = c.desired + + newData, err := json.Marshal(c.node) + if err != nil { + return err, nil + } + + patchBytes, patchErr := strategicpatch.CreateTwoWayMergePatch(oldData, newData, c.node) + if patchErr == nil { + _, err = client.Patch(c.node.Name, types.StrategicMergePatchType, patchBytes) + } else { + _, err = client.Update(c.node) + } + return err, patchErr +} diff --git a/pkg/drain/default.go b/pkg/drain/default.go new file mode 100644 index 0000000000..ec0351b0ff --- /dev/null +++ b/pkg/drain/default.go @@ -0,0 +1,69 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package drain + +import ( + "fmt" + + corev1 "k8s.io/api/core/v1" + utilerrors "k8s.io/apimachinery/pkg/util/errors" +) + +// This file contains default implementations of how to +// drain/cordon/uncordon nodes. These functions may be called +// directly, or their functionality copied into your own code, for +// example if you want different output behaviour. + +// RunNodeDrain shows the canonical way to drain a node. +// You should first cordon the node, e.g. using RunCordonOrUncordon +func RunNodeDrain(drainer *Helper, nodeName string) error { + // TODO(justinsb): Ensure we have adequate e2e coverage of this function in library consumers + list, errs := drainer.GetPodsForDeletion(nodeName) + if errs != nil { + return utilerrors.NewAggregate(errs) + } + if warnings := list.Warnings(); warnings != "" { + fmt.Fprintf(drainer.ErrOut, "WARNING: %s\n", warnings) + } + + if err := drainer.DeleteOrEvictPods(list.Pods()); err != nil { + // Maybe warn about non-deleted pods here + return err + } + return nil +} + +// RunCordonOrUncordon demonstrates the canonical way to cordon or uncordon a Node +func RunCordonOrUncordon(drainer *Helper, node *corev1.Node, desired bool) error { + // TODO(justinsb): Ensure we have adequate e2e coverage of this function in library consumers + c := NewCordonHelper(node) + + if updateRequired := c.UpdateIfRequired(desired); !updateRequired { + // Already done + return nil + } + + err, patchErr := c.PatchOrReplace(drainer.Client) + if patchErr != nil { + return patchErr + } + if err != nil { + return err + } + + return nil +} diff --git a/pkg/drain/drain.go b/pkg/drain/drain.go new file mode 100644 index 0000000000..2e9c9e8c49 --- /dev/null +++ b/pkg/drain/drain.go @@ -0,0 +1,292 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package drain + +import ( + "fmt" + "io" + "math" + "time" + + corev1 "k8s.io/api/core/v1" + policyv1beta1 "k8s.io/api/policy/v1beta1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" +) + +const ( + // EvictionKind represents the kind of evictions object + EvictionKind = "Eviction" + // EvictionSubresource represents the kind of evictions object as pod's subresource + EvictionSubresource = "pods/eviction" +) + +// Helper contains the parameters to control the behaviour of drainer +type Helper struct { + Client kubernetes.Interface + Force bool + GracePeriodSeconds int + IgnoreAllDaemonSets bool + Timeout time.Duration + DeleteLocalData bool + Selector string + PodSelector string + Out io.Writer + ErrOut io.Writer + + // TODO(justinsb): unnecessary? + DryRun bool + + // OnPodDeletedOrEvicted is called when a pod is evicted/deleted; for printing progress output + OnPodDeletedOrEvicted func(pod *corev1.Pod, usingEviction bool) +} + +// CheckEvictionSupport uses Discovery API to find out if the server support +// eviction subresource If support, it will return its groupVersion; Otherwise, +// it will return an empty string +func CheckEvictionSupport(clientset kubernetes.Interface) (string, error) { + discoveryClient := clientset.Discovery() + groupList, err := discoveryClient.ServerGroups() + if err != nil { + return "", err + } + foundPolicyGroup := false + var policyGroupVersion string + for _, group := range groupList.Groups { + if group.Name == "policy" { + foundPolicyGroup = true + policyGroupVersion = group.PreferredVersion.GroupVersion + break + } + } + if !foundPolicyGroup { + return "", nil + } + resourceList, err := discoveryClient.ServerResourcesForGroupVersion("v1") + if err != nil { + return "", err + } + for _, resource := range resourceList.APIResources { + if resource.Name == EvictionSubresource && resource.Kind == EvictionKind { + return policyGroupVersion, nil + } + } + return "", nil +} + +func (d *Helper) makeDeleteOptions() *metav1.DeleteOptions { + deleteOptions := &metav1.DeleteOptions{} + if d.GracePeriodSeconds >= 0 { + gracePeriodSeconds := int64(d.GracePeriodSeconds) + deleteOptions.GracePeriodSeconds = &gracePeriodSeconds + } + return deleteOptions +} + +// DeletePod will delete the given pod, or return an error if it couldn't +func (d *Helper) DeletePod(pod corev1.Pod) error { + return d.Client.CoreV1().Pods(pod.Namespace).Delete(pod.Name, d.makeDeleteOptions()) +} + +// EvictPod will evict the give pod, or return an error if it couldn't +func (d *Helper) EvictPod(pod corev1.Pod, policyGroupVersion string) error { + eviction := &policyv1beta1.Eviction{ + TypeMeta: metav1.TypeMeta{ + APIVersion: policyGroupVersion, + Kind: EvictionKind, + }, + ObjectMeta: metav1.ObjectMeta{ + Name: pod.Name, + Namespace: pod.Namespace, + }, + DeleteOptions: d.makeDeleteOptions(), + } + // Remember to change change the URL manipulation func when Eviction's version change + return d.Client.PolicyV1beta1().Evictions(eviction.Namespace).Evict(eviction) +} + +// GetPodsForDeletion receives resource info for a node, and returns those pods as PodDeleteList, +// or error if it cannot list pods. All pods that are ready to be deleted can be obtained with .Pods(), +// and string with all warning can be obtained with .Warnings(), and .Errors() for all errors that +// occurred during deletion. +func (d *Helper) GetPodsForDeletion(nodeName string) (*podDeleteList, []error) { + labelSelector, err := labels.Parse(d.PodSelector) + if err != nil { + return nil, []error{err} + } + + podList, err := d.Client.CoreV1().Pods(metav1.NamespaceAll).List(metav1.ListOptions{ + LabelSelector: labelSelector.String(), + FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": nodeName}).String()}) + if err != nil { + return nil, []error{err} + } + + pods := []podDelete{} + + for _, pod := range podList.Items { + var status podDeleteStatus + for _, filter := range d.makeFilters() { + status = filter(pod) + if !status.delete { + // short-circuit as soon as pod is filtered out + // at that point, there is no reason to run pod + // through any additional filters + break + } + } + pods = append(pods, podDelete{ + pod: pod, + status: status, + }) + } + + list := &podDeleteList{items: pods} + + if errs := list.errors(); len(errs) > 0 { + return list, errs + } + + return list, nil +} + +// DeleteOrEvictPods deletes or evicts the pods on the api server +func (d *Helper) DeleteOrEvictPods(pods []corev1.Pod) error { + if len(pods) == 0 { + return nil + } + + policyGroupVersion, err := CheckEvictionSupport(d.Client) + if err != nil { + return err + } + + // TODO(justinsb): unnecessary? + getPodFn := func(namespace, name string) (*corev1.Pod, error) { + return d.Client.CoreV1().Pods(namespace).Get(name, metav1.GetOptions{}) + } + + if len(policyGroupVersion) > 0 { + return d.evictPods(pods, policyGroupVersion, getPodFn) + } + + return d.deletePods(pods, getPodFn) +} + +func (d *Helper) evictPods(pods []corev1.Pod, policyGroupVersion string, getPodFn func(namespace, name string) (*corev1.Pod, error)) error { + returnCh := make(chan error, 1) + + for _, pod := range pods { + go func(pod corev1.Pod, returnCh chan error) { + for { + fmt.Fprintf(d.Out, "evicting pod %q\n", pod.Name) + err := d.EvictPod(pod, policyGroupVersion) + if err == nil { + break + } else if apierrors.IsNotFound(err) { + returnCh <- nil + return + } else if apierrors.IsTooManyRequests(err) { + fmt.Fprintf(d.ErrOut, "error when evicting pod %q (will retry after 5s): %v\n", pod.Name, err) + time.Sleep(5 * time.Second) + } else { + returnCh <- fmt.Errorf("error when evicting pod %q: %v", pod.Name, err) + return + } + } + _, err := waitForDelete([]corev1.Pod{pod}, 1*time.Second, time.Duration(math.MaxInt64), true, getPodFn, d.OnPodDeletedOrEvicted) + if err == nil { + returnCh <- nil + } else { + returnCh <- fmt.Errorf("error when waiting for pod %q terminating: %v", pod.Name, err) + } + }(pod, returnCh) + } + + doneCount := 0 + var errors []error + + // 0 timeout means infinite, we use MaxInt64 to represent it. + var globalTimeout time.Duration + if d.Timeout == 0 { + globalTimeout = time.Duration(math.MaxInt64) + } else { + globalTimeout = d.Timeout + } + globalTimeoutCh := time.After(globalTimeout) + numPods := len(pods) + for doneCount < numPods { + select { + case err := <-returnCh: + doneCount++ + if err != nil { + errors = append(errors, err) + } + case <-globalTimeoutCh: + return fmt.Errorf("drain did not complete within %v", globalTimeout) + } + } + return utilerrors.NewAggregate(errors) +} + +func (d *Helper) deletePods(pods []corev1.Pod, getPodFn func(namespace, name string) (*corev1.Pod, error)) error { + // 0 timeout means infinite, we use MaxInt64 to represent it. + var globalTimeout time.Duration + if d.Timeout == 0 { + globalTimeout = time.Duration(math.MaxInt64) + } else { + globalTimeout = d.Timeout + } + for _, pod := range pods { + err := d.DeletePod(pod) + if err != nil && !apierrors.IsNotFound(err) { + return err + } + } + _, err := waitForDelete(pods, 1*time.Second, globalTimeout, false, getPodFn, d.OnPodDeletedOrEvicted) + return err +} + +func waitForDelete(pods []corev1.Pod, interval, timeout time.Duration, usingEviction bool, getPodFn func(string, string) (*corev1.Pod, error), onDoneFn func(pod *corev1.Pod, usingEviction bool)) ([]corev1.Pod, error) { + err := wait.PollImmediate(interval, timeout, func() (bool, error) { + pendingPods := []corev1.Pod{} + for i, pod := range pods { + p, err := getPodFn(pod.Namespace, pod.Name) + if apierrors.IsNotFound(err) || (p != nil && p.ObjectMeta.UID != pod.ObjectMeta.UID) { + if onDoneFn != nil { + onDoneFn(&pod, usingEviction) + } + continue + } else if err != nil { + return false, err + } else { + pendingPods = append(pendingPods, pods[i]) + } + } + pods = pendingPods + if len(pendingPods) > 0 { + return false, nil + } + return true, nil + }) + return pods, err +} diff --git a/pkg/drain/filters.go b/pkg/drain/filters.go new file mode 100644 index 0000000000..2cbba24563 --- /dev/null +++ b/pkg/drain/filters.go @@ -0,0 +1,223 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package drain + +import ( + "fmt" + "strings" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +const ( + daemonSetFatal = "DaemonSet-managed Pods (use --ignore-daemonsets to ignore)" + daemonSetWarning = "ignoring DaemonSet-managed Pods" + localStorageFatal = "Pods with local storage (use --delete-local-data to override)" + localStorageWarning = "deleting Pods with local storage" + unmanagedFatal = "Pods not managed by ReplicationController, ReplicaSet, Job, DaemonSet or StatefulSet (use --force to override)" + unmanagedWarning = "deleting Pods not managed by ReplicationController, ReplicaSet, Job, DaemonSet or StatefulSet" +) + +type podDelete struct { + pod corev1.Pod + status podDeleteStatus +} + +type podDeleteList struct { + items []podDelete +} + +func (l *podDeleteList) Pods() []corev1.Pod { + pods := []corev1.Pod{} + for _, i := range l.items { + if i.status.delete { + pods = append(pods, i.pod) + } + } + return pods +} + +func (l *podDeleteList) Warnings() string { + ps := make(map[string][]string) + for _, i := range l.items { + if i.status.reason == podDeleteStatusTypeWarning { + ps[i.status.message] = append(ps[i.status.message], fmt.Sprintf("%s/%s", i.pod.Namespace, i.pod.Name)) + } + } + + msgs := []string{} + for key, pods := range ps { + msgs = append(msgs, fmt.Sprintf("%s: %s", key, strings.Join(pods, ", "))) + } + return strings.Join(msgs, "; ") +} + +func (l *podDeleteList) errors() []error { + failedPods := make(map[string][]string) + for _, i := range l.items { + if i.status.reason == podDeleteStatusTypeError { + msg := i.status.message + if msg == "" { + msg = "unexpected error" + } + failedPods[msg] = append(failedPods[msg], fmt.Sprintf("%s/%s", i.pod.Namespace, i.pod.Name)) + } + } + errs := make([]error, 0) + for msg, pods := range failedPods { + errs = append(errs, fmt.Errorf("cannot delete %s: %s", msg, strings.Join(pods, ", "))) + } + return errs +} + +type podDeleteStatus struct { + delete bool + reason string + message string +} + +// Takes a pod and returns a PodDeleteStatus +type podFilter func(corev1.Pod) podDeleteStatus + +const ( + podDeleteStatusTypeOkay = "Okay" + podDeleteStatusTypeSkip = "Skip" + podDeleteStatusTypeWarning = "Warning" + podDeleteStatusTypeError = "Error" +) + +func makePodDeleteStatusOkay() podDeleteStatus { + return podDeleteStatus{ + delete: true, + reason: podDeleteStatusTypeOkay, + } +} + +func makePodDeleteStatusSkip() podDeleteStatus { + return podDeleteStatus{ + delete: false, + reason: podDeleteStatusTypeSkip, + } +} + +func makePodDeleteStatusWithWarning(delete bool, message string) podDeleteStatus { + return podDeleteStatus{ + delete: delete, + reason: podDeleteStatusTypeWarning, + message: message, + } +} + +func makePodDeleteStatusWithError(message string) podDeleteStatus { + return podDeleteStatus{ + delete: false, + reason: podDeleteStatusTypeError, + message: message, + } +} + +func (d *Helper) makeFilters() []podFilter { + return []podFilter{ + d.daemonSetFilter, + d.mirrorPodFilter, + d.localStorageFilter, + d.unreplicatedFilter, + } +} + +func hasLocalStorage(pod corev1.Pod) bool { + for _, volume := range pod.Spec.Volumes { + if volume.EmptyDir != nil { + return true + } + } + + return false +} + +func (d *Helper) daemonSetFilter(pod corev1.Pod) podDeleteStatus { + // Note that we return false in cases where the pod is DaemonSet managed, + // regardless of flags. + // + // The exception is for pods that are orphaned (the referencing + // management resource - including DaemonSet - is not found). + // Such pods will be deleted if --force is used. + controllerRef := metav1.GetControllerOf(&pod) + if controllerRef == nil || controllerRef.Kind != appsv1.SchemeGroupVersion.WithKind("DaemonSet").Kind { + return makePodDeleteStatusOkay() + } + // Any finished pod can be removed. + if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed { + return makePodDeleteStatusOkay() + } + + if _, err := d.Client.AppsV1().DaemonSets(pod.Namespace).Get(controllerRef.Name, metav1.GetOptions{}); err != nil { + // remove orphaned pods with a warning if --force is used + if apierrors.IsNotFound(err) && d.Force { + return makePodDeleteStatusWithWarning(true, err.Error()) + } + + return makePodDeleteStatusWithError(err.Error()) + } + + if !d.IgnoreAllDaemonSets { + return makePodDeleteStatusWithError(daemonSetFatal) + } + + return makePodDeleteStatusWithWarning(false, daemonSetWarning) +} + +func (d *Helper) mirrorPodFilter(pod corev1.Pod) podDeleteStatus { + if _, found := pod.ObjectMeta.Annotations[corev1.MirrorPodAnnotationKey]; found { + return makePodDeleteStatusSkip() + } + return makePodDeleteStatusOkay() +} + +func (d *Helper) localStorageFilter(pod corev1.Pod) podDeleteStatus { + if !hasLocalStorage(pod) { + return makePodDeleteStatusOkay() + } + // Any finished pod can be removed. + if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed { + return makePodDeleteStatusOkay() + } + if !d.DeleteLocalData { + return makePodDeleteStatusWithError(localStorageFatal) + } + + return makePodDeleteStatusWithWarning(true, localStorageWarning) +} + +func (d *Helper) unreplicatedFilter(pod corev1.Pod) podDeleteStatus { + // any finished pod can be removed + if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed { + return makePodDeleteStatusOkay() + } + + controllerRef := metav1.GetControllerOf(&pod) + if controllerRef != nil { + return makePodDeleteStatusOkay() + } + if d.Force { + return makePodDeleteStatusWithWarning(true, unmanagedWarning) + } + return makePodDeleteStatusWithError(unmanagedFatal) +} diff --git a/pkg/instancegroups/BUILD.bazel b/pkg/instancegroups/BUILD.bazel index 50ca60e3fa..f3e72809be 100644 --- a/pkg/instancegroups/BUILD.bazel +++ b/pkg/instancegroups/BUILD.bazel @@ -13,6 +13,7 @@ go_library( "//pkg/apis/kops:go_default_library", "//pkg/client/simple:go_default_library", "//pkg/cloudinstances:go_default_library", + "//pkg/drain:go_default_library", "//pkg/featureflag:go_default_library", "//pkg/validation:go_default_library", "//upup/pkg/fi:go_default_library", @@ -22,8 +23,6 @@ go_library( "//vendor/k8s.io/cli-runtime/pkg/genericclioptions:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", "//vendor/k8s.io/klog:go_default_library", - "//vendor/k8s.io/kubernetes/pkg/kubectl/cmd/drain:go_default_library", - "//vendor/k8s.io/kubernetes/pkg/kubectl/cmd/util:go_default_library", ], ) diff --git a/pkg/instancegroups/instancegroups.go b/pkg/instancegroups/instancegroups.go index 8684be14c7..fd210b3718 100644 --- a/pkg/instancegroups/instancegroups.go +++ b/pkg/instancegroups/instancegroups.go @@ -26,15 +26,13 @@ import ( corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/klog" api "k8s.io/kops/pkg/apis/kops" "k8s.io/kops/pkg/cloudinstances" + "k8s.io/kops/pkg/drain" "k8s.io/kops/pkg/featureflag" "k8s.io/kops/pkg/validation" "k8s.io/kops/upup/pkg/fi" - cmddrain "k8s.io/kubernetes/pkg/kubectl/cmd/drain" - cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" ) // RollingUpdateInstanceGroup is the AWS ASG backing an InstanceGroup. @@ -315,42 +313,32 @@ func (r *RollingUpdateInstanceGroup) DeleteInstance(u *cloudinstances.CloudInsta // DrainNode drains a K8s node. func (r *RollingUpdateInstanceGroup) DrainNode(u *cloudinstances.CloudInstanceGroupMember, rollingUpdateData *RollingUpdateCluster) error { - if rollingUpdateData.ClientGetter == nil { - return fmt.Errorf("ClientGetter not set") + if rollingUpdateData.K8sClient == nil { + return fmt.Errorf("K8sClient not set") } if u.Node.Name == "" { return fmt.Errorf("node name not set") } - f := cmdutil.NewFactory(rollingUpdateData.ClientGetter) - streams := genericclioptions.IOStreams{ - Out: os.Stdout, - ErrOut: os.Stderr, + helper := &drain.Helper{ + Client: rollingUpdateData.K8sClient, + Force: true, + GracePeriodSeconds: -1, + IgnoreAllDaemonSets: true, + Out: os.Stdout, + ErrOut: os.Stderr, + + // Other options we might want to set: + // Timeout? + // DeleteLocalData? } - drain := cmddrain.NewCmdDrain(f, streams) - args := []string{u.Node.Name} - options := cmddrain.NewDrainOptions(f, streams) - - // Override some options - options.IgnoreDaemonsets = true - options.Force = true - options.DeleteLocalData = true - options.GracePeriodSeconds = -1 - - err := options.Complete(f, drain, args) - if err != nil { - return fmt.Errorf("error setting up drain: %v", err) + if err := drain.RunCordonOrUncordon(helper, u.Node, true); err != nil { + return fmt.Errorf("error cordoning node: %v", err) } - err = options.RunCordonOrUncordon(true) - if err != nil { - return fmt.Errorf("error cordoning node node: %v", err) - } - - err = options.RunDrain() - if err != nil { + if err := drain.RunNodeDrain(helper, u.Node.Name); err != nil { return fmt.Errorf("error draining node: %v", err) } diff --git a/pkg/model/network.go b/pkg/model/network.go index d6fffc7685..00514ed147 100644 --- a/pkg/model/network.go +++ b/pkg/model/network.go @@ -25,7 +25,7 @@ import ( "k8s.io/kops/upup/pkg/fi" "k8s.io/kops/upup/pkg/fi/cloudup/awstasks" "k8s.io/kops/upup/pkg/fi/cloudup/awsup" - "k8s.io/kubernetes/pkg/cloudprovider/providers/aws" + "k8s.io/legacy-cloud-providers/aws" ) // NetworkModelBuilder configures network objects diff --git a/pkg/pkiutil/BUILD.bazel b/pkg/pkiutil/BUILD.bazel new file mode 100644 index 0000000000..6fd8553abb --- /dev/null +++ b/pkg/pkiutil/BUILD.bazel @@ -0,0 +1,8 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = ["pki_helpers.go"], + importpath = "k8s.io/kops/pkg/pkiutil", + visibility = ["//visibility:public"], +) diff --git a/pkg/pkiutil/pki_helpers.go b/pkg/pkiutil/pki_helpers.go new file mode 100644 index 0000000000..a9b0c8acbc --- /dev/null +++ b/pkg/pkiutil/pki_helpers.go @@ -0,0 +1,100 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pkiutil + +import ( + "crypto" + cryptorand "crypto/rand" + "crypto/rsa" + "crypto/x509" + "crypto/x509/pkix" + "encoding/pem" + "fmt" + "math" + "math/big" + "time" + + certutil "k8s.io/client-go/util/cert" +) + +const ( + // CertificateBlockType is a possible value for pem.Block.Type. + CertificateBlockType = "CERTIFICATE" + + // RSAPrivateKeyBlockType is a possible value for pem.Block.Type. + RSAPrivateKeyBlockType = "RSA PRIVATE KEY" + + rsaKeySize = 2048 + + duration365d = time.Hour * 24 * 365 +) + +// EncodeCertPEM returns PEM-endcoded certificate data +func EncodeCertPEM(cert *x509.Certificate) []byte { + block := pem.Block{ + Type: CertificateBlockType, + Bytes: cert.Raw, + } + return pem.EncodeToMemory(&block) +} + +// EncodePrivateKeyPEM returns PEM-encoded private key data +func EncodePrivateKeyPEM(key *rsa.PrivateKey) []byte { + block := pem.Block{ + Type: RSAPrivateKeyBlockType, + Bytes: x509.MarshalPKCS1PrivateKey(key), + } + return pem.EncodeToMemory(&block) +} + +// NewPrivateKey creates an RSA private key +func NewPrivateKey() (*rsa.PrivateKey, error) { + return rsa.GenerateKey(cryptorand.Reader, rsaKeySize) +} + +// NewSignedCert creates a signed certificate using the given CA certificate and key +func NewSignedCert(cfg *certutil.Config, key crypto.Signer, caCert *x509.Certificate, caKey crypto.Signer) (*x509.Certificate, error) { + serial, err := cryptorand.Int(cryptorand.Reader, new(big.Int).SetInt64(math.MaxInt64)) + if err != nil { + return nil, err + } + if len(cfg.CommonName) == 0 { + return nil, fmt.Errorf("must specify a CommonName") + } + if len(cfg.Usages) == 0 { + return nil, fmt.Errorf("must specify at least one ExtKeyUsage") + } + + certTmpl := x509.Certificate{ + Subject: pkix.Name{ + CommonName: cfg.CommonName, + Organization: cfg.Organization, + }, + DNSNames: cfg.AltNames.DNSNames, + IPAddresses: cfg.AltNames.IPs, + SerialNumber: serial, + NotBefore: caCert.NotBefore, + NotAfter: time.Now().Add(duration365d).UTC(), + KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature, + ExtKeyUsage: cfg.Usages, + } + certDERBytes, err := x509.CreateCertificate(cryptorand.Reader, &certTmpl, caCert, key.Public(), caKey) + if err != nil { + return nil, err + } + return x509.ParseCertificate(certDERBytes) +} diff --git a/protokube/pkg/protokube/volume_mounter.go b/protokube/pkg/protokube/volume_mounter.go index 0778d6e97c..e99c6c3016 100644 --- a/protokube/pkg/protokube/volume_mounter.go +++ b/protokube/pkg/protokube/volume_mounter.go @@ -25,8 +25,9 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog" "k8s.io/kubernetes/pkg/util/mount" - "k8s.io/kubernetes/pkg/util/nsenter" // moves to k8s.io/utils/nsenter in 1.14 + nsenterutil "k8s.io/kubernetes/pkg/volume/util/nsenter" utilsexec "k8s.io/utils/exec" + "k8s.io/utils/nsenter" ) type VolumeMountController struct { @@ -123,7 +124,7 @@ func (k *VolumeMountController) safeFormatAndMount(volume *Volume, mountpoint st sharedDir := "/no-shared-directories" // Build mount & exec implementations that execute in the host namespaces - safeFormatAndMount.Interface = mount.NewNsenterMounter(sharedDir, ne) + safeFormatAndMount.Interface = nsenterutil.NewMounter(sharedDir, ne) safeFormatAndMount.Exec = NewNsEnterExec() // Note that we don't use pathFor for operations going through safeFormatAndMount, diff --git a/upup/pkg/fi/cloudup/awsup/aws_cloud.go b/upup/pkg/fi/cloudup/awsup/aws_cloud.go index 5a676b03f8..813aa33384 100644 --- a/upup/pkg/fi/cloudup/awsup/aws_cloud.go +++ b/upup/pkg/fi/cloudup/awsup/aws_cloud.go @@ -51,7 +51,7 @@ import ( "k8s.io/kops/pkg/featureflag" "k8s.io/kops/pkg/resources/spotinst" "k8s.io/kops/upup/pkg/fi" - k8s_aws "k8s.io/kubernetes/pkg/cloudprovider/providers/aws" + k8s_aws "k8s.io/legacy-cloud-providers/aws" ) // By default, aws-sdk-go only retries 3 times, which doesn't give