Support pruning in channels command

We let the addon specify exactly what should be pruned; this approach
is a little more verbose but we're likely generating this
automatically in the common case anyway.

In return for the verbosity, we can likely handle more future cases
and edge cases (for example removing objects that aren't labelled or
are in the wrong namespace).
This commit is contained in:
justinsb 2021-08-15 00:18:32 -04:00
parent 3568bf6e5f
commit 454c47f92b
9 changed files with 291 additions and 30 deletions

View File

@ -69,6 +69,19 @@ type AddonSpec struct {
NeedsPKI bool `json:"needsPKI,omitempty"`
Version string `json:"version,omitempty"`
Prune *PruneSpec `json:"prune,omitempty"`
}
type PruneSpec struct {
Kinds []PruneKindSpec `json:"kinds,omitempty"`
}
type PruneKindSpec struct {
Group string `json:"group,omitempty"`
Kind string `json:"kind,omitempty"`
Namespaces []string `json:"namespaces,omitempty"`
LabelSelector string `json:"labelSelector,omitempty"`
}
func (a *Addons) Verify() error {

View File

@ -7,11 +7,13 @@ go_library(
"addons.go",
"apply.go",
"channel_version.go",
"prune.go",
],
importpath = "k8s.io/kops/channels/pkg/channels",
visibility = ["//visibility:public"],
deps = [
"//channels/pkg/api:go_default_library",
"//pkg/kubemanifest:go_default_library",
"//pkg/pki:go_default_library",
"//upup/pkg/fi/utils:go_default_library",
"//util/pkg/vfs:go_default_library",
@ -21,9 +23,13 @@ go_library(
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/validation/field:go_default_library",
"//vendor/k8s.io/client-go/dynamic:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/restmapper:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",
],
)

View File

@ -24,6 +24,7 @@ import (
"net/url"
"k8s.io/kops/pkg/pki"
"k8s.io/kops/util/pkg/vfs"
certmanager "github.com/jetstack/cert-manager/pkg/client/clientset/versioned"
"k8s.io/apimachinery/pkg/api/errors"
@ -151,7 +152,7 @@ func (a *Addon) GetManifestFullUrl() (*url.URL, error) {
return manifestURL, nil
}
func (a *Addon) EnsureUpdated(ctx context.Context, k8sClient kubernetes.Interface, cmClient certmanager.Interface) (*AddonUpdate, error) {
func (a *Addon) EnsureUpdated(ctx context.Context, k8sClient kubernetes.Interface, cmClient certmanager.Interface, pruner *Pruner) (*AddonUpdate, error) {
required, err := a.GetRequiredUpdates(ctx, k8sClient, cmClient)
if err != nil {
return nil, err
@ -167,9 +168,18 @@ func (a *Addon) EnsureUpdated(ctx context.Context, k8sClient kubernetes.Interfac
}
klog.Infof("Applying update from %q", manifestURL)
err = Apply(manifestURL.String())
// We copy the manifest to a temp file because it is likely e.g. an s3 URL, which kubectl can't read
data, err := vfs.Context.ReadFile(manifestURL.String())
if err != nil {
return nil, fmt.Errorf("error applying update from %q: %v", manifestURL, err)
return nil, fmt.Errorf("error reading manifest: %w", err)
}
if err := Apply(data); err != nil {
return nil, fmt.Errorf("error applying update from %q: %w", manifestURL, err)
}
if err := pruner.Prune(ctx, data, a.Spec.Prune); err != nil {
return nil, fmt.Errorf("error pruning manifest from %q: %w", manifestURL, err)
}
if err := a.AddNeedsUpdateLabel(ctx, k8sClient, required); err != nil {

View File

@ -25,18 +25,12 @@ import (
"strings"
"k8s.io/klog/v2"
"k8s.io/kops/util/pkg/vfs"
)
// Apply calls kubectl apply to apply the manifest.
// We will likely in future change this to create things directly (or more likely embed this logic into kubectl itself)
func Apply(manifest string) error {
func Apply(data []byte) error {
// We copy the manifest to a temp file because it is likely e.g. an s3 URL, which kubectl can't read
data, err := vfs.Context.ReadFile(manifest)
if err != nil {
return fmt.Errorf("error reading manifest: %v", err)
}
tmpDir, err := ioutil.TempDir("", "channel")
if err != nil {
return fmt.Errorf("error creating temp dir: %v", err)

View File

@ -0,0 +1,144 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package channels
import (
"context"
"fmt"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/restmapper"
"k8s.io/klog/v2"
"k8s.io/kops/channels/pkg/api"
"k8s.io/kops/pkg/kubemanifest"
)
type Pruner struct {
Client dynamic.Interface
RESTMapper *restmapper.DeferredDiscoveryRESTMapper
}
// Prune prunes objects not in the manifest, according to PruneSpec.
func (p *Pruner) Prune(ctx context.Context, manifest []byte, spec *api.PruneSpec) error {
if spec == nil {
return nil
}
objects, err := kubemanifest.LoadObjectsFrom(manifest)
if err != nil {
return fmt.Errorf("failed to parse objects: %w", err)
}
objectsByKind := make(map[schema.GroupKind][]*kubemanifest.Object)
for _, object := range objects {
gv, err := schema.ParseGroupVersion(object.APIVersion())
if err != nil || gv.Version == "" {
return fmt.Errorf("failed to parse apiVersion %q", object.APIVersion())
}
kind := object.Kind()
if kind == "" {
return fmt.Errorf("failed to find kind in object")
}
gvk := gv.WithKind(kind)
gk := gvk.GroupKind()
objectsByKind[gk] = append(objectsByKind[gk], object)
}
for i := range spec.Kinds {
pruneKind := &spec.Kinds[i]
gk := schema.GroupKind{Group: pruneKind.Group, Kind: pruneKind.Kind}
if err := p.pruneObjectsOfKind(ctx, gk, pruneKind, objectsByKind[gk]); err != nil {
return fmt.Errorf("failed to prune objects of kind %s: %w", gk, err)
}
}
return nil
}
func (p *Pruner) pruneObjectsOfKind(ctx context.Context, gk schema.GroupKind, spec *api.PruneKindSpec, keepObjects []*kubemanifest.Object) error {
restMapping, err := p.RESTMapper.RESTMapping(gk)
if err != nil {
return fmt.Errorf("unable to find resource for %s: %w", gk, err)
}
gvr := restMapping.Resource
var listOptions v1.ListOptions
listOptions.LabelSelector = spec.LabelSelector
baseResource := p.Client.Resource(gvr)
if len(spec.Namespaces) == 0 {
objects, err := baseResource.List(ctx, listOptions)
if err != nil {
return fmt.Errorf("error listing objects: %w", err)
}
if err := p.pruneObjects(ctx, gvr, objects, keepObjects); err != nil {
return err
}
} else {
for _, namespace := range spec.Namespaces {
resource := baseResource.Namespace(namespace)
actualObjects, err := resource.List(ctx, listOptions)
if err != nil {
return fmt.Errorf("error listing objects in namespace %s: %w", namespace, err)
}
if err := p.pruneObjects(ctx, gvr, actualObjects, keepObjects); err != nil {
return err
}
}
}
return nil
}
func (p *Pruner) pruneObjects(ctx context.Context, gvr schema.GroupVersionResource, actualObjects *unstructured.UnstructuredList, keepObjects []*kubemanifest.Object) error {
keepMap := make(map[string]*kubemanifest.Object)
for _, keepObject := range keepObjects {
key := keepObject.GetNamespace() + "/" + keepObject.GetName()
keepMap[key] = keepObject
}
for _, actualObject := range actualObjects.Items {
name := actualObject.GetName()
namespace := actualObject.GetNamespace()
key := namespace + "/" + name
if _, found := keepMap[key]; found {
// Object is in manifest, don't delete
continue
}
klog.Infof("pruning %s %s", gvr, key)
var resource dynamic.ResourceInterface
if namespace != "" {
resource = p.Client.Resource(gvr).Namespace(namespace)
} else {
resource = p.Client.Resource(gvr)
}
var opts v1.DeleteOptions
if err := resource.Delete(ctx, name, opts); err != nil {
return fmt.Errorf("failed to delete %s: %w", key, err)
}
}
return nil
}

View File

@ -22,10 +22,12 @@ go_library(
"//vendor/github.com/spf13/viper:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/cli-runtime/pkg/genericclioptions:go_default_library",
"//vendor/k8s.io/client-go/dynamic:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/plugin/pkg/client/auth:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/client-go/tools/clientcmd:go_default_library",
"//vendor/k8s.io/client-go/restmapper:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",
],
)

View File

@ -66,6 +66,16 @@ func RunApplyChannel(ctx context.Context, f Factory, out io.Writer, options *App
return err
}
dynamicClient, err := f.DynamicClient()
if err != nil {
return err
}
restMapper, err := f.RESTMapper()
if err != nil {
return err
}
kubernetesVersionInfo, err := k8sClient.Discovery().ServerVersion()
if err != nil {
return fmt.Errorf("error querying kubernetes version: %v", err)
@ -200,8 +210,13 @@ func RunApplyChannel(ctx context.Context, f Factory, out io.Writer, options *App
return nil
}
pruner := &channels.Pruner{
Client: dynamicClient,
RESTMapper: restMapper,
}
for _, needUpdate := range needUpdates {
update, err := needUpdate.EnsureUpdated(ctx, k8sClient, cmClient)
update, err := needUpdate.EnsureUpdated(ctx, k8sClient, cmClient, pruner)
if err != nil {
fmt.Printf("error updating %q: %v", needUpdate.Name, err)
} else if update != nil {

View File

@ -19,9 +19,11 @@ package cmd
import (
"fmt"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/restmapper"
_ "k8s.io/client-go/plugin/pkg/client/auth"
@ -31,37 +33,43 @@ import (
type Factory interface {
KubernetesClient() (kubernetes.Interface, error)
CertManagerClient() (certmanager.Interface, error)
RESTMapper() (*restmapper.DeferredDiscoveryRESTMapper, error)
DynamicClient() (dynamic.Interface, error)
}
type DefaultFactory struct {
ConfigFlags genericclioptions.ConfigFlags
kubernetesClient kubernetes.Interface
certManagerClient certmanager.Interface
cachedRESTConfig *rest.Config
dynamicClient dynamic.Interface
restMapper *restmapper.DeferredDiscoveryRESTMapper
}
var _ Factory = &DefaultFactory{}
func loadConfig() (*rest.Config, error) {
loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
loadingRules.DefaultClientConfig = &clientcmd.DefaultClientConfig
configOverrides := &clientcmd.ConfigOverrides{
ClusterDefaults: clientcmd.ClusterDefaults,
func (f *DefaultFactory) restConfig() (*rest.Config, error) {
if f.cachedRESTConfig == nil {
restConfig, err := f.ConfigFlags.ToRESTConfig()
if err != nil {
return nil, fmt.Errorf("cannot load kubecfg settings: %w", err)
}
f.cachedRESTConfig = restConfig
}
kubeConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, configOverrides)
return kubeConfig.ClientConfig()
return f.cachedRESTConfig, nil
}
func (f *DefaultFactory) KubernetesClient() (kubernetes.Interface, error) {
if f.kubernetesClient == nil {
config, err := loadConfig()
restConfig, err := f.restConfig()
if err != nil {
return nil, fmt.Errorf("cannot load kubecfg settings: %v", err)
return nil, err
}
k8sClient, err := kubernetes.NewForConfig(config)
k8sClient, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return nil, fmt.Errorf("cannot build kube client: %v", err)
return nil, fmt.Errorf("cannot build kube client: %w", err)
}
f.kubernetesClient = k8sClient
}
@ -69,13 +77,29 @@ func (f *DefaultFactory) KubernetesClient() (kubernetes.Interface, error) {
return f.kubernetesClient, nil
}
func (f *DefaultFactory) DynamicClient() (dynamic.Interface, error) {
if f.dynamicClient == nil {
restConfig, err := f.restConfig()
if err != nil {
return nil, fmt.Errorf("cannot load kubecfg settings: %w", err)
}
dynamicClient, err := dynamic.NewForConfig(restConfig)
if err != nil {
return nil, fmt.Errorf("cannot build dynamicClient client: %v", err)
}
f.dynamicClient = dynamicClient
}
return f.dynamicClient, nil
}
func (f *DefaultFactory) CertManagerClient() (certmanager.Interface, error) {
if f.certManagerClient == nil {
config, err := loadConfig()
restConfig, err := f.restConfig()
if err != nil {
return nil, fmt.Errorf("cannot load kubecfg settings: %v", err)
return nil, err
}
certManagerClient, err := certmanager.NewForConfig(config)
certManagerClient, err := certmanager.NewForConfig(restConfig)
if err != nil {
return nil, fmt.Errorf("cannot build kube client: %v", err)
}
@ -84,3 +108,18 @@ func (f *DefaultFactory) CertManagerClient() (certmanager.Interface, error) {
return f.certManagerClient, nil
}
func (f *DefaultFactory) RESTMapper() (*restmapper.DeferredDiscoveryRESTMapper, error) {
if f.restMapper == nil {
discoveryClient, err := f.ConfigFlags.ToDiscoveryClient()
if err != nil {
return nil, err
}
restMapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient)
f.restMapper = restMapper
}
return f.restMapper, nil
}

View File

@ -140,6 +140,44 @@ func (m *Object) Kind() string {
return s
}
// GetNamespace returns the namespace field of the object, or "" if it cannot be found or is invalid
func (m *Object) GetNamespace() string {
metadata := m.metadata()
return getStringValue(metadata, "namespace")
}
// GetName returns the namespace field of the object, or "" if it cannot be found or is invalid
func (m *Object) GetName() string {
metadata := m.metadata()
return getStringValue(metadata, "name")
}
// getStringValue returns the specified field of the object, or "" if it cannot be found or is invalid
func getStringValue(m map[string]interface{}, key string) string {
v, found := m[key]
if !found {
return ""
}
s, ok := v.(string)
if !ok {
return ""
}
return s
}
// metadata returns the metadata map of the object, or nil if it cannot be found or is invalid
func (m *Object) metadata() map[string]interface{} {
v, found := m.data["metadata"]
if !found {
return nil
}
metadata, ok := v.(map[string]interface{})
if !ok {
return nil
}
return metadata
}
// APIVersion returns the apiVersion field of the object, or "" if it cannot be found or is invalid
func (m *Object) APIVersion() string {
v, found := m.data["apiVersion"]