From 859a9fd9f1e0c4804e04bb68d723ae34f7cc930d Mon Sep 17 00:00:00 2001 From: justinsb Date: Fri, 27 Dec 2024 15:32:26 -0500 Subject: [PATCH] chore: refactor factory to accept a cluster This should allow us to build our own rest config in future, rather than relying on the kubeconfig being configured correctly. To do this, we need to stop sharing the factory between the channels and kops commands. --- channels/cmd/channels/main.go | 19 ++- channels/pkg/cmd/apply.go | 2 +- channels/pkg/cmd/apply_channel.go | 28 +++-- channels/pkg/cmd/factory.go | 99 ++++++++++++++-- channels/pkg/cmd/get.go | 2 +- channels/pkg/cmd/get_addons.go | 16 ++- channels/pkg/cmd/root.go | 7 +- cmd/kops/delete_instance.go | 85 +++++++++----- cmd/kops/get_instances.go | 32 ++--- cmd/kops/rolling-update_cluster.go | 31 ++--- cmd/kops/toolbox_addons.go | 7 +- cmd/kops/toolbox_dump.go | 1 + cmd/kops/util/factory.go | 148 ++++++++++++++---------- cmd/kops/validate_cluster.go | 39 ++++--- pkg/commands/commandutils/factory.go | 4 + pkg/commands/toolbox_enroll.go | 10 +- pkg/instancegroups/rollingupdate.go | 4 +- pkg/validation/validate_cluster.go | 9 +- pkg/validation/validate_cluster_test.go | 11 +- 19 files changed, 367 insertions(+), 187 deletions(-) diff --git a/channels/cmd/channels/main.go b/channels/cmd/channels/main.go index 52581881cb..c9c60c485d 100644 --- a/channels/cmd/channels/main.go +++ b/channels/cmd/channels/main.go @@ -17,20 +17,29 @@ limitations under the License. package main import ( + "context" "fmt" "os" "k8s.io/klog/v2" + "k8s.io/kops/channels/pkg/cmd" - "k8s.io/kops/cmd/kops/util" ) func main() { - klog.InitFlags(nil) - - f := util.NewFactory(nil) - if err := cmd.Execute(f, os.Stdout); err != nil { + if err := run(context.Background()); err != nil { fmt.Fprintf(os.Stderr, "\n%v\n", err) os.Exit(1) } } + +func run(ctx context.Context) error { + klog.InitFlags(nil) + + f := cmd.NewChannelsFactory() + + if err := cmd.Execute(ctx, f, os.Stdout); err != nil { + return err + } + return nil +} diff --git a/channels/pkg/cmd/apply.go b/channels/pkg/cmd/apply.go index dc1492c416..c9f56a0399 100644 --- a/channels/pkg/cmd/apply.go +++ b/channels/pkg/cmd/apply.go @@ -22,7 +22,7 @@ import ( "github.com/spf13/cobra" ) -func NewCmdApply(f Factory, out io.Writer) *cobra.Command { +func NewCmdApply(f *ChannelsFactory, out io.Writer) *cobra.Command { cmd := &cobra.Command{ Use: "apply", Short: "apply resources from a channel", diff --git a/channels/pkg/cmd/apply_channel.go b/channels/pkg/cmd/apply_channel.go index e12d2a231e..65174b737c 100644 --- a/channels/pkg/cmd/apply_channel.go +++ b/channels/pkg/cmd/apply_channel.go @@ -25,12 +25,15 @@ import ( "github.com/blang/semver/v4" "github.com/cert-manager/cert-manager/pkg/client/clientset/versioned" + certmanager "github.com/cert-manager/cert-manager/pkg/client/clientset/versioned" "github.com/spf13/cobra" "go.uber.org/multierr" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" "k8s.io/client-go/restmapper" + "k8s.io/kops/channels/pkg/channels" "k8s.io/kops/util/pkg/tables" "k8s.io/kops/util/pkg/vfs" @@ -38,9 +41,11 @@ import ( type ApplyChannelOptions struct { Yes bool + + configFlags genericclioptions.ConfigFlags } -func NewCmdApplyChannel(f Factory, out io.Writer) *cobra.Command { +func NewCmdApplyChannel(f *ChannelsFactory, out io.Writer) *cobra.Command { var options ApplyChannelOptions cmd := &cobra.Command{ @@ -57,20 +62,29 @@ func NewCmdApplyChannel(f Factory, out io.Writer) *cobra.Command { return cmd } -func RunApplyChannel(ctx context.Context, f Factory, out io.Writer, options *ApplyChannelOptions, args []string) error { - k8sClient, err := f.KubernetesClient() +func RunApplyChannel(ctx context.Context, f *ChannelsFactory, out io.Writer, options *ApplyChannelOptions, args []string) error { + restConfig, err := f.RESTConfig() + if err != nil { + return err + } + httpClient, err := f.HTTPClient() if err != nil { return err } - cmClient, err := f.CertManagerClient() + k8sClient, err := kubernetes.NewForConfigAndClient(restConfig, httpClient) if err != nil { - return err + return fmt.Errorf("building kube client: %w", err) + } + + cmClient, err := certmanager.NewForConfigAndClient(restConfig, httpClient) + if err != nil { + return fmt.Errorf("building cert manager client: %w", err) } dynamicClient, err := f.DynamicClient() if err != nil { - return err + return fmt.Errorf("building dynamic client: %w", err) } restMapper, err := f.RESTMapper() @@ -92,7 +106,7 @@ func RunApplyChannel(ctx context.Context, f Factory, out io.Writer, options *App kubernetesVersion.Pre = nil if len(args) != 1 { - return fmt.Errorf("unexpected number of arguments. Only one channel may be processed at the same time.") + return fmt.Errorf("unexpected number of arguments. Only one channel may be processed at the same time") } channelLocation := args[0] diff --git a/channels/pkg/cmd/factory.go b/channels/pkg/cmd/factory.go index 932d24d81e..c6c57aa4cc 100644 --- a/channels/pkg/cmd/factory.go +++ b/channels/pkg/cmd/factory.go @@ -17,20 +17,101 @@ limitations under the License. package cmd import ( + "fmt" + "net/http" + + "k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/client-go/dynamic" - "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" "k8s.io/client-go/restmapper" "k8s.io/kops/util/pkg/vfs" _ "k8s.io/client-go/plugin/pkg/client/auth" - - certmanager "github.com/cert-manager/cert-manager/pkg/client/clientset/versioned" ) -type Factory interface { - VFSContext() *vfs.VFSContext - KubernetesClient() (kubernetes.Interface, error) - CertManagerClient() (certmanager.Interface, error) - RESTMapper() (*restmapper.DeferredDiscoveryRESTMapper, error) - DynamicClient() (dynamic.Interface, error) +type ChannelsFactory struct { + configFlags genericclioptions.ConfigFlags + cachedRESTConfig *rest.Config + cachedHTTPClient *http.Client + vfsContext *vfs.VFSContext + restMapper *restmapper.DeferredDiscoveryRESTMapper + dynamicClient dynamic.Interface +} + +func NewChannelsFactory() *ChannelsFactory { + return &ChannelsFactory{} +} + +func (f *ChannelsFactory) RESTConfig() (*rest.Config, error) { + if f.cachedRESTConfig == nil { + clientGetter := genericclioptions.NewConfigFlags(true) + + restConfig, err := clientGetter.ToRESTConfig() + if err != nil { + return nil, fmt.Errorf("cannot load kubecfg settings: %w", err) + } + + restConfig.UserAgent = "kops" + restConfig.Burst = 50 + restConfig.QPS = 20 + f.cachedRESTConfig = restConfig + } + return f.cachedRESTConfig, nil +} + +func (f *ChannelsFactory) HTTPClient() (*http.Client, error) { + if f.cachedHTTPClient == nil { + restConfig, err := f.RESTConfig() + if err != nil { + return nil, err + } + httpClient, err := rest.HTTPClientFor(restConfig) + if err != nil { + return nil, fmt.Errorf("getting http client: %w", err) + } + f.cachedHTTPClient = httpClient + } + return f.cachedHTTPClient, nil +} + +func (f *ChannelsFactory) RESTMapper() (*restmapper.DeferredDiscoveryRESTMapper, error) { + if f.restMapper == nil { + discoveryClient, err := f.configFlags.ToDiscoveryClient() + if err != nil { + return nil, err + } + + restMapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient) + + f.restMapper = restMapper + } + + return f.restMapper, nil +} + +func (f *ChannelsFactory) DynamicClient() (dynamic.Interface, error) { + if f.dynamicClient == nil { + restConfig, err := f.RESTConfig() + if err != nil { + return nil, err + } + httpClient, err := f.HTTPClient() + if err != nil { + return nil, err + } + dynamicClient, err := dynamic.NewForConfigAndClient(restConfig, httpClient) + if err != nil { + return nil, err + } + f.dynamicClient = dynamicClient + } + return f.dynamicClient, nil +} + +func (f *ChannelsFactory) VFSContext() *vfs.VFSContext { + if f.vfsContext == nil { + // TODO vfs.NewVFSContext() + f.vfsContext = vfs.Context + } + return f.vfsContext } diff --git a/channels/pkg/cmd/get.go b/channels/pkg/cmd/get.go index 1e0e018d96..be57ad26f4 100644 --- a/channels/pkg/cmd/get.go +++ b/channels/pkg/cmd/get.go @@ -22,7 +22,7 @@ import ( "github.com/spf13/cobra" ) -func NewCmdGet(f Factory, out io.Writer) *cobra.Command { +func NewCmdGet(f *ChannelsFactory, out io.Writer) *cobra.Command { cmd := &cobra.Command{ Use: "get", SuggestFor: []string{"list"}, diff --git a/channels/pkg/cmd/get_addons.go b/channels/pkg/cmd/get_addons.go index 3baada9249..58151321a5 100644 --- a/channels/pkg/cmd/get_addons.go +++ b/channels/pkg/cmd/get_addons.go @@ -41,13 +41,14 @@ import ( "github.com/spf13/cobra" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" "k8s.io/kops/channels/pkg/channels" "k8s.io/kops/util/pkg/tables" ) type GetAddonsOptions struct{} -func NewCmdGetAddons(f Factory, out io.Writer) *cobra.Command { +func NewCmdGetAddons(f *ChannelsFactory, out io.Writer) *cobra.Command { var options GetAddonsOptions cmd := &cobra.Command{ @@ -70,11 +71,20 @@ type addonInfo struct { Namespace *v1.Namespace } -func RunGetAddons(ctx context.Context, f Factory, out io.Writer, options *GetAddonsOptions) error { - k8sClient, err := f.KubernetesClient() +func RunGetAddons(ctx context.Context, f *ChannelsFactory, out io.Writer, options *GetAddonsOptions) error { + restConfig, err := f.RESTConfig() if err != nil { return err } + httpClient, err := f.HTTPClient() + if err != nil { + return err + } + + k8sClient, err := kubernetes.NewForConfigAndClient(restConfig, httpClient) + if err != nil { + return fmt.Errorf("building kube client: %w", err) + } namespaces, err := k8sClient.CoreV1().Namespaces().List(ctx, metav1.ListOptions{}) if err != nil { diff --git a/channels/pkg/cmd/root.go b/channels/pkg/cmd/root.go index cc6c1193bd..0532736633 100644 --- a/channels/pkg/cmd/root.go +++ b/channels/pkg/cmd/root.go @@ -17,6 +17,7 @@ limitations under the License. package cmd import ( + "context" goflag "flag" "fmt" "io" @@ -29,17 +30,17 @@ type CmdRootOptions struct { configFile string } -func Execute(f Factory, out io.Writer) error { +func Execute(ctx context.Context, f *ChannelsFactory, out io.Writer) error { cobra.OnInitialize(initConfig) cmd := NewCmdRoot(f, out) goflag.Set("logtostderr", "true") goflag.CommandLine.Parse([]string{}) - return cmd.Execute() + return cmd.ExecuteContext(ctx) } -func NewCmdRoot(f Factory, out io.Writer) *cobra.Command { +func NewCmdRoot(f *ChannelsFactory, out io.Writer) *cobra.Command { options := &CmdRootOptions{} cmd := &cobra.Command{ diff --git a/cmd/kops/delete_instance.go b/cmd/kops/delete_instance.go index 0f8a0ab9e5..214f09654c 100644 --- a/cmd/kops/delete_instance.go +++ b/cmd/kops/delete_instance.go @@ -30,6 +30,7 @@ import ( "k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/client-go/kubernetes" _ "k8s.io/client-go/plugin/pkg/client/auth" + "k8s.io/client-go/rest" "k8s.io/kops/cmd/kops/util" kopsapi "k8s.io/kops/pkg/apis/kops" "k8s.io/kops/pkg/cloudinstances" @@ -163,11 +164,28 @@ func RunDeleteInstance(ctx context.Context, f *util.Factory, out io.Writer, opti return err } - var nodes []v1.Node var k8sClient kubernetes.Interface - var host string + var restConfig *rest.Config if !options.CloudOnly { - k8sClient, host, nodes, err = getNodes(ctx, cluster, true) + restConfig, err = f.RESTConfig(cluster) + if err != nil { + return fmt.Errorf("getting rest config: %w", err) + } + + httpClient, err := f.HTTPClient(cluster) + if err != nil { + return fmt.Errorf("getting http client: %w", err) + } + + k8sClient, err = kubernetes.NewForConfigAndClient(restConfig, httpClient) + if err != nil { + return fmt.Errorf("cannot build kube client: %w", err) + } + } + + var nodes []v1.Node + if !options.CloudOnly { + nodes, err = getNodes(ctx, k8sClient, true) if err != nil { return err } @@ -241,7 +259,7 @@ func RunDeleteInstance(ctx context.Context, f *util.Factory, out io.Writer, opti var clusterValidator validation.ClusterValidator if !options.CloudOnly { - clusterValidator, err = validation.NewClusterValidator(cluster, cloud, list, host, k8sClient) + clusterValidator, err = validation.NewClusterValidator(cluster, cloud, list, restConfig, k8sClient) if err != nil { return fmt.Errorf("cannot create cluster validator: %v", err) } @@ -251,37 +269,22 @@ func RunDeleteInstance(ctx context.Context, f *util.Factory, out io.Writer, opti return d.UpdateSingleInstance(cloudMember, options.Surge) } -func getNodes(ctx context.Context, cluster *kopsapi.Cluster, verbose bool) (kubernetes.Interface, string, []v1.Node, error) { +func getNodes(ctx context.Context, kubeClient kubernetes.Interface, verbose bool) ([]v1.Node, error) { var nodes []v1.Node - var k8sClient kubernetes.Interface - contextName := cluster.ObjectMeta.Name - clientGetter := genericclioptions.NewConfigFlags(true) - clientGetter.Context = &contextName - - config, err := clientGetter.ToRESTConfig() - if err != nil { - return nil, "", nil, fmt.Errorf("cannot load kubecfg settings for %q: %v", contextName, err) - } - - k8sClient, err = kubernetes.NewForConfig(config) - if err != nil { - return nil, "", nil, fmt.Errorf("cannot build kube client for %q: %v", contextName, err) - } - - nodeList, err := k8sClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) + nodeList, err := kubeClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) if err != nil { if verbose { fmt.Fprintf(os.Stderr, "Unable to reach the kubernetes API.\n") fmt.Fprintf(os.Stderr, "Use --cloudonly to do a deletion without confirming progress with the k8s API\n\n") } - return nil, "", nil, fmt.Errorf("listing nodes in cluster: %v", err) + return nil, fmt.Errorf("listing nodes in cluster: %v", err) } if nodeList != nil { nodes = nodeList.Items } - return k8sClient, config.Host, nodes, nil + return nodes, nil } func deleteNodeMatch(cloudMember *cloudinstances.CloudInstance, options *DeleteInstanceOptions) bool { @@ -320,15 +323,25 @@ func completeInstanceOrNode(f commandutils.Factory, options *DeleteInstanceOptio return completions, directive } - var nodes []v1.Node - var err error + var kubeClient kubernetes.Interface if !options.CloudOnly { - _, _, nodes, err = getNodes(ctx, cluster, false) + var err error + kubeClient, err = getKubeClientFromKubeconfig(ctx, cluster) if err != nil { cobra.CompErrorln(err.Error()) } } + var nodes []v1.Node + if kubeClient != nil { + nodeList, err := kubeClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) + if err != nil { + cobra.CompErrorln(err.Error()) + } else if nodeList != nil { + nodes = nodeList.Items + } + } + list, err := clientSet.InstanceGroupsFor(cluster).List(ctx, metav1.ListOptions{}) if err != nil { return commandutils.CompletionError("listing instance groups", err) @@ -369,6 +382,26 @@ func completeInstanceOrNode(f commandutils.Factory, options *DeleteInstanceOptio } } +// getKubeClientFromKubeconfig returns a kubernetes client from the kubeconfig, +// assuming it has already been exported. This is not ideal, but is reasonable +// for command completion. +func getKubeClientFromKubeconfig(ctx context.Context, cluster *kopsapi.Cluster) (kubernetes.Interface, error) { + contextName := cluster.ObjectMeta.Name + clientGetter := genericclioptions.NewConfigFlags(true) + clientGetter.Context = &contextName + + config, err := clientGetter.ToRESTConfig() + if err != nil { + return nil, fmt.Errorf("cannot load kubecfg settings for %q: %w", contextName, err) + } + + k8sClient, err := kubernetes.NewForConfig(config) + if err != nil { + return nil, fmt.Errorf("cannot build kube client for %q: %w", contextName, err) + } + return k8sClient, nil +} + func appendInstance(completions []string, instance *cloudinstances.CloudInstance, longestGroup int) []string { completion := instance.ID if instance.CloudInstanceGroup.InstanceGroup != nil { diff --git a/cmd/kops/get_instances.go b/cmd/kops/get_instances.go index acc1adb845..8d039a4470 100644 --- a/cmd/kops/get_instances.go +++ b/cmd/kops/get_instances.go @@ -23,6 +23,7 @@ import ( "io" "strings" + "k8s.io/client-go/kubernetes" "k8s.io/kops/pkg/cloudinstances" "k8s.io/kops/pkg/commands/commandutils" "k8s.io/kubectl/pkg/util/i18n" @@ -31,10 +32,7 @@ import ( "k8s.io/klog/v2" - "k8s.io/client-go/kubernetes" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/kops/util/pkg/tables" @@ -101,11 +99,21 @@ func RunGetInstances(ctx context.Context, f *util.Factory, out io.Writer, option return err } - k8sClient, err := createK8sClient(cluster) + restConfig, err := f.RESTConfig(cluster) if err != nil { return err } + httpClient, err := f.HTTPClient(cluster) + if err != nil { + return err + } + + k8sClient, err := kubernetes.NewForConfigAndClient(restConfig, httpClient) + if err != nil { + return fmt.Errorf("building kubernetes client: %w", err) + } + nodeList, err := k8sClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) if err != nil { klog.Warningf("cannot list node names. Kubernetes API unavailable: %v", err) @@ -200,22 +208,6 @@ func instanceOutputTable(instances []*cloudinstances.CloudInstance, out io.Write return t.Render(instances, out, columns...) } -func createK8sClient(cluster *kops.Cluster) (*kubernetes.Clientset, error) { - contextName := cluster.ObjectMeta.Name - clientGetter := genericclioptions.NewConfigFlags(true) - clientGetter.Context = &contextName - - config, err := clientGetter.ToRESTConfig() - if err != nil { - return nil, fmt.Errorf("cannot load kubecfg settings for %q: %v", contextName, err) - } - k8sClient, err := kubernetes.NewForConfig(config) - if err != nil { - return nil, fmt.Errorf("cannot build kubernetes api client for %q: %v", contextName, err) - } - return k8sClient, nil -} - func asRenderable(instances []*cloudinstances.CloudInstance) []*renderableCloudInstance { arr := make([]*renderableCloudInstance, len(instances)) for i, ci := range instances { diff --git a/cmd/kops/rolling-update_cluster.go b/cmd/kops/rolling-update_cluster.go index 7cd40abaed..b29740af24 100644 --- a/cmd/kops/rolling-update_cluster.go +++ b/cmd/kops/rolling-update_cluster.go @@ -30,7 +30,6 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/client-go/kubernetes" _ "k8s.io/client-go/plugin/pkg/client/auth" "k8s.io/kops/cmd/kops/util" @@ -238,21 +237,22 @@ func RunRollingUpdateCluster(ctx context.Context, f *util.Factory, out io.Writer return err } - contextName := cluster.ObjectMeta.Name - clientGetter := genericclioptions.NewConfigFlags(true) - clientGetter.Context = &contextName - - config, err := clientGetter.ToRESTConfig() - if err != nil { - return fmt.Errorf("cannot load kubecfg settings for %q: %v", contextName, err) - } - var nodes []v1.Node var k8sClient kubernetes.Interface if !options.CloudOnly { - k8sClient, err = kubernetes.NewForConfig(config) + restConfig, err := f.RESTConfig(cluster) if err != nil { - return fmt.Errorf("cannot build kube client for %q: %v", contextName, err) + return fmt.Errorf("getting rest config: %w", err) + } + + httpClient, err := f.HTTPClient(cluster) + if err != nil { + return fmt.Errorf("getting http client: %w", err) + } + + k8sClient, err = kubernetes.NewForConfigAndClient(restConfig, httpClient) + if err != nil { + return fmt.Errorf("getting kubernetes client: %w", err) } nodeList, err := k8sClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) @@ -449,7 +449,12 @@ func RunRollingUpdateCluster(ctx context.Context, f *util.Factory, out io.Writer var clusterValidator validation.ClusterValidator if !options.CloudOnly { - clusterValidator, err = validation.NewClusterValidator(cluster, cloud, list, config.Host, k8sClient) + restConfig, err := f.RESTConfig(cluster) + if err != nil { + return fmt.Errorf("getting rest config: %w", err) + } + + clusterValidator, err = validation.NewClusterValidator(cluster, cloud, list, restConfig, k8sClient) if err != nil { return fmt.Errorf("cannot create cluster validator: %v", err) } diff --git a/cmd/kops/toolbox_addons.go b/cmd/kops/toolbox_addons.go index e06ac7c0a8..c522924c1e 100644 --- a/cmd/kops/toolbox_addons.go +++ b/cmd/kops/toolbox_addons.go @@ -17,11 +17,9 @@ limitations under the License. package main import ( - "context" "io" channelscmd "k8s.io/kops/channels/pkg/cmd" - "k8s.io/kops/cmd/kops/util" "github.com/spf13/cobra" ) @@ -34,8 +32,7 @@ func NewCmdToolboxAddons(out io.Writer) *cobra.Command { SilenceUsage: true, } - f := util.NewFactory(nil) - ctx := context.Background() + f := channelscmd.NewChannelsFactory() // create subcommands cmd.AddCommand(&cobra.Command{ @@ -43,6 +40,7 @@ func NewCmdToolboxAddons(out io.Writer) *cobra.Command { Short: "Applies updates from the given channel", Example: "kops toolbox addons apply s3:////addons/bootstrap-channel.yaml", RunE: func(cmd *cobra.Command, args []string) error { + ctx := cmd.Context() return channelscmd.RunApplyChannel(ctx, f, out, &channelscmd.ApplyChannelOptions{}, args) }, }) @@ -50,6 +48,7 @@ func NewCmdToolboxAddons(out io.Writer) *cobra.Command { Use: "list", Short: "Lists installed addons", RunE: func(cmd *cobra.Command, args []string) error { + ctx := cmd.Context() return channelscmd.RunGetAddons(ctx, f, out, &channelscmd.GetAddonsOptions{}) }, }) diff --git a/cmd/kops/toolbox_dump.go b/cmd/kops/toolbox_dump.go index b86f0a678a..ab205c9696 100644 --- a/cmd/kops/toolbox_dump.go +++ b/cmd/kops/toolbox_dump.go @@ -176,6 +176,7 @@ func RunToolboxDump(ctx context.Context, f commandutils.Factory, out io.Writer, var nodes corev1.NodeList + // TODO: We should use the factory to get the kubeconfig kubeConfig, err := clientGetter.ToRESTConfig() if err != nil { klog.Warningf("cannot load kubeconfig settings for %q: %v", contextName, err) diff --git a/cmd/kops/util/factory.go b/cmd/kops/util/factory.go index 753999f486..e173a3de6e 100644 --- a/cmd/kops/util/factory.go +++ b/cmd/kops/util/factory.go @@ -18,20 +18,20 @@ package util import ( "fmt" + "net/http" "net/url" "strings" + "sync" - certmanager "github.com/cert-manager/cert-manager/pkg/client/clientset/versioned" "k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/client-go/dynamic" - "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" - "k8s.io/client-go/restmapper" "k8s.io/client-go/tools/clientcmd" "k8s.io/klog/v2" - channelscmd "k8s.io/kops/channels/pkg/cmd" + gceacls "k8s.io/kops/pkg/acls/gce" + "k8s.io/kops/pkg/apis/kops" kopsclient "k8s.io/kops/pkg/client/clientset_generated/clientset" "k8s.io/kops/pkg/client/simple" "k8s.io/kops/pkg/client/simple/api" @@ -44,24 +44,36 @@ type FactoryOptions struct { } type Factory struct { - ConfigFlags genericclioptions.ConfigFlags - options *FactoryOptions - clientset simple.Clientset + options *FactoryOptions + clientset simple.Clientset - kubernetesClient kubernetes.Interface - certManagerClient certmanager.Interface - vfsContext *vfs.VFSContext + vfsContext *vfs.VFSContext - cachedRESTConfig *rest.Config - dynamicClient dynamic.Interface - restMapper *restmapper.DeferredDiscoveryRESTMapper + // mutex protects access to the clusters map + mutex sync.Mutex + // clusters holds REST connection configuration for connecting to clusters + clusters map[string]*clusterInfo +} + +// clusterInfo holds REST connection configuration for connecting to a cluster +type clusterInfo struct { + clusterName string + + cachedHTTPClient *http.Client + cachedRESTConfig *rest.Config + cachedDynamicClient dynamic.Interface } func NewFactory(options *FactoryOptions) *Factory { gceacls.Register() + if options == nil { + options = &FactoryOptions{} + } + return &Factory{ - options: options, + options: options, + clusters: make(map[string]*clusterInfo), } } @@ -143,14 +155,38 @@ func (f *Factory) KopsStateStore() string { return f.options.RegistryPath } -var _ channelscmd.Factory = &Factory{} +func (f *Factory) getClusterInfo(clusterName string) *clusterInfo { + f.mutex.Lock() + defer f.mutex.Unlock() -func (f *Factory) restConfig() (*rest.Config, error) { + if clusterInfo, ok := f.clusters[clusterName]; ok { + return clusterInfo + } + clusterInfo := &clusterInfo{} + f.clusters[clusterName] = clusterInfo + return clusterInfo +} + +func (f *Factory) RESTConfig(cluster *kops.Cluster) (*rest.Config, error) { + clusterInfo := f.getClusterInfo(cluster.ObjectMeta.Name) + return clusterInfo.RESTConfig() +} + +func (f *clusterInfo) RESTConfig() (*rest.Config, error) { if f.cachedRESTConfig == nil { - restConfig, err := f.ConfigFlags.ToRESTConfig() - if err != nil { - return nil, fmt.Errorf("cannot load kubecfg settings: %w", err) + // Get the kubeconfig from the context + + clientGetter := genericclioptions.NewConfigFlags(true) + if f.clusterName != "" { + contextName := f.clusterName + clientGetter.Context = &contextName } + + restConfig, err := clientGetter.ToRESTConfig() + if err != nil { + return nil, fmt.Errorf("loading kubecfg settings for %q: %w", f.clusterName, err) + } + restConfig.UserAgent = "kops" restConfig.Burst = 50 restConfig.QPS = 20 @@ -159,67 +195,51 @@ func (f *Factory) restConfig() (*rest.Config, error) { return f.cachedRESTConfig, nil } -func (f *Factory) KubernetesClient() (kubernetes.Interface, error) { - if f.kubernetesClient == nil { - restConfig, err := f.restConfig() +func (f *Factory) HTTPClient(cluster *kops.Cluster) (*http.Client, error) { + clusterInfo := f.getClusterInfo(cluster.ObjectMeta.Name) + return clusterInfo.HTTPClient() +} + +func (f *clusterInfo) HTTPClient() (*http.Client, error) { + if f.cachedHTTPClient == nil { + restConfig, err := f.RESTConfig() if err != nil { return nil, err } - k8sClient, err := kubernetes.NewForConfig(restConfig) + httpClient, err := rest.HTTPClientFor(restConfig) if err != nil { - return nil, fmt.Errorf("cannot build kube client: %w", err) + return nil, fmt.Errorf("building http client: %w", err) } - f.kubernetesClient = k8sClient + f.cachedHTTPClient = httpClient } - - return f.kubernetesClient, nil + return f.cachedHTTPClient, nil } -func (f *Factory) DynamicClient() (dynamic.Interface, error) { - if f.dynamicClient == nil { - restConfig, err := f.restConfig() - if err != nil { - return nil, fmt.Errorf("cannot load kubecfg settings: %w", err) - } - dynamicClient, err := dynamic.NewForConfig(restConfig) - if err != nil { - return nil, fmt.Errorf("cannot build dynamicClient client: %v", err) - } - f.dynamicClient = dynamicClient - } - - return f.dynamicClient, nil +// DynamicClient returns a dynamic client +func (f *Factory) DynamicClient(clusterName string) (dynamic.Interface, error) { + clusterInfo := f.getClusterInfo(clusterName) + return clusterInfo.DynamicClient() } -func (f *Factory) CertManagerClient() (certmanager.Interface, error) { - if f.certManagerClient == nil { - restConfig, err := f.restConfig() - if err != nil { - return nil, err - } - certManagerClient, err := certmanager.NewForConfig(restConfig) - if err != nil { - return nil, fmt.Errorf("cannot build kube client: %v", err) - } - f.certManagerClient = certManagerClient - } - - return f.certManagerClient, nil -} - -func (f *Factory) RESTMapper() (*restmapper.DeferredDiscoveryRESTMapper, error) { - if f.restMapper == nil { - discoveryClient, err := f.ConfigFlags.ToDiscoveryClient() +func (f *clusterInfo) DynamicClient() (dynamic.Interface, error) { + if f.cachedDynamicClient == nil { + restConfig, err := f.RESTConfig() if err != nil { return nil, err } - restMapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient) + httpClient, err := f.HTTPClient() + if err != nil { + return nil, err + } - f.restMapper = restMapper + dynamicClient, err := dynamic.NewForConfigAndClient(restConfig, httpClient) + if err != nil { + return nil, fmt.Errorf("building dynamic client: %w", err) + } + f.cachedDynamicClient = dynamicClient } - - return f.restMapper, nil + return f.cachedDynamicClient, nil } func (f *Factory) VFSContext() *vfs.VFSContext { diff --git a/cmd/kops/validate_cluster.go b/cmd/kops/validate_cluster.go index 7960684587..656d53ecdf 100644 --- a/cmd/kops/validate_cluster.go +++ b/cmd/kops/validate_cluster.go @@ -25,6 +25,7 @@ import ( "strings" "time" + "k8s.io/client-go/kubernetes" "k8s.io/kops/pkg/commands/commandutils" "k8s.io/kops/upup/pkg/fi/cloudup" "k8s.io/kubectl/pkg/util/i18n" @@ -33,8 +34,6 @@ import ( "github.com/spf13/cobra" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/clientcmd" "k8s.io/klog/v2" "k8s.io/kops/cmd/kops/util" kopsapi "k8s.io/kops/pkg/apis/kops" @@ -148,27 +147,37 @@ func RunValidateCluster(ctx context.Context, f *util.Factory, out io.Writer, opt return nil, fmt.Errorf("no InstanceGroup objects found") } - // TODO: Refactor into util.Factory - contextName := cluster.ObjectMeta.Name - configLoadingRules := clientcmd.NewDefaultClientConfigLoadingRules() - if options.kubeconfig != "" { - configLoadingRules.ExplicitPath = options.kubeconfig - } - config, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig( - configLoadingRules, - &clientcmd.ConfigOverrides{CurrentContext: contextName}).ClientConfig() + // // TODO: Refactor into util.Factory + // contextName := cluster.ObjectMeta.Name + // configLoadingRules := clientcmd.NewDefaultClientConfigLoadingRules() + // if options.kubeconfig != "" { + // configLoadingRules.ExplicitPath = options.kubeconfig + // } + // config, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig( + // configLoadingRules, + // &clientcmd.ConfigOverrides{CurrentContext: contextName}).ClientConfig() + // if err != nil { + // return nil, fmt.Errorf("cannot load kubecfg settings for %q: %v", contextName, err) + // } + + restConfig, err := f.RESTConfig(cluster) if err != nil { - return nil, fmt.Errorf("cannot load kubecfg settings for %q: %v", contextName, err) + return nil, fmt.Errorf("getting rest config: %w", err) } - k8sClient, err := kubernetes.NewForConfig(config) + httpClient, err := f.HTTPClient(cluster) if err != nil { - return nil, fmt.Errorf("cannot build kubernetes api client for %q: %v", contextName, err) + return nil, fmt.Errorf("getting http client: %w", err) + } + + k8sClient, err := kubernetes.NewForConfigAndClient(restConfig, httpClient) + if err != nil { + return nil, fmt.Errorf("building kubernetes client: %w", err) } timeout := time.Now().Add(options.wait) - validator, err := validation.NewClusterValidator(cluster, cloud, list, config.Host, k8sClient) + validator, err := validation.NewClusterValidator(cluster, cloud, list, restConfig, k8sClient) if err != nil { return nil, fmt.Errorf("unexpected error creating validatior: %v", err) } diff --git a/pkg/commands/commandutils/factory.go b/pkg/commands/commandutils/factory.go index 59c331ddc5..4436e07ccf 100644 --- a/pkg/commands/commandutils/factory.go +++ b/pkg/commands/commandutils/factory.go @@ -17,6 +17,9 @@ limitations under the License. package commandutils import ( + "k8s.io/client-go/rest" + + "k8s.io/kops/pkg/apis/kops" "k8s.io/kops/pkg/client/simple" "k8s.io/kops/util/pkg/vfs" ) @@ -24,4 +27,5 @@ import ( type Factory interface { KopsClient() (simple.Clientset, error) VFSContext() *vfs.VFSContext + RESTConfig(cluster *kops.Cluster) (*rest.Config, error) } diff --git a/pkg/commands/toolbox_enroll.go b/pkg/commands/toolbox_enroll.go index 7288357500..1b3af073a4 100644 --- a/pkg/commands/toolbox_enroll.go +++ b/pkg/commands/toolbox_enroll.go @@ -37,7 +37,6 @@ import ( "golang.org/x/crypto/ssh/agent" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/client-go/rest" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" @@ -110,14 +109,9 @@ func RunToolboxEnroll(ctx context.Context, f commandutils.Factory, out io.Writer // Enroll the node over SSH. if options.Host != "" { - // TODO: This is the pattern we use a lot, but should we try to access it directly? - contextName := fullCluster.ObjectMeta.Name - clientGetter := genericclioptions.NewConfigFlags(true) - clientGetter.Context = &contextName - - restConfig, err := clientGetter.ToRESTConfig() + restConfig, err := f.RESTConfig(fullCluster) if err != nil { - return fmt.Errorf("cannot load kubecfg settings for %q: %w", contextName, err) + return err } if err := enrollHost(ctx, fullInstanceGroup, options, bootstrapData, restConfig); err != nil { diff --git a/pkg/instancegroups/rollingupdate.go b/pkg/instancegroups/rollingupdate.go index f361d3db2d..46ff464dd0 100644 --- a/pkg/instancegroups/rollingupdate.go +++ b/pkg/instancegroups/rollingupdate.go @@ -25,11 +25,11 @@ import ( "time" "k8s.io/apimachinery/pkg/util/errors" - "k8s.io/kops/pkg/client/simple" - "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" + api "k8s.io/kops/pkg/apis/kops" + "k8s.io/kops/pkg/client/simple" "k8s.io/kops/pkg/cloudinstances" "k8s.io/kops/pkg/validation" "k8s.io/kops/upup/pkg/fi" diff --git a/pkg/validation/validate_cluster.go b/pkg/validation/validate_cluster.go index 71ead73a06..b21e93b308 100644 --- a/pkg/validation/validate_cluster.go +++ b/pkg/validation/validate_cluster.go @@ -25,6 +25,7 @@ import ( "strings" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/rest" "k8s.io/client-go/tools/pager" "k8s.io/kops/pkg/apis/kops" "k8s.io/kops/upup/pkg/fi" @@ -62,7 +63,7 @@ type clusterValidatorImpl struct { cluster *kops.Cluster cloud fi.Cloud instanceGroups []*kops.InstanceGroup - host string + restConfig *rest.Config k8sClient kubernetes.Interface } @@ -100,7 +101,7 @@ func hasPlaceHolderIP(host string) (string, error) { return "", nil } -func NewClusterValidator(cluster *kops.Cluster, cloud fi.Cloud, instanceGroupList *kops.InstanceGroupList, host string, k8sClient kubernetes.Interface) (ClusterValidator, error) { +func NewClusterValidator(cluster *kops.Cluster, cloud fi.Cloud, instanceGroupList *kops.InstanceGroupList, restConfig *rest.Config, k8sClient kubernetes.Interface) (ClusterValidator, error) { var instanceGroups []*kops.InstanceGroup for i := range instanceGroupList.Items { @@ -116,7 +117,7 @@ func NewClusterValidator(cluster *kops.Cluster, cloud fi.Cloud, instanceGroupLis cluster: cluster, cloud: cloud, instanceGroups: instanceGroups, - host: host, + restConfig: restConfig, k8sClient: k8sClient, }, nil } @@ -133,7 +134,7 @@ func (v *clusterValidatorImpl) Validate() (*ValidationCluster, error) { dnsProvider = kops.ExternalDNSProviderExternalDNS } - hasPlaceHolderIPAddress, err := hasPlaceHolderIP(v.host) + hasPlaceHolderIPAddress, err := hasPlaceHolderIP(v.restConfig.Host) if err != nil { return nil, err } diff --git a/pkg/validation/validate_cluster_test.go b/pkg/validation/validate_cluster_test.go index c3ccf96d6e..821c692b55 100644 --- a/pkg/validation/validate_cluster_test.go +++ b/pkg/validation/validate_cluster_test.go @@ -26,6 +26,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/rest" kopsapi "k8s.io/kops/pkg/apis/kops" "k8s.io/kops/pkg/cloudinstances" "k8s.io/kops/upup/pkg/fi" @@ -126,7 +127,10 @@ func testValidate(t *testing.T, groups map[string]*cloudinstances.CloudInstanceG mockcloud := BuildMockCloud(t, groups, cluster, instanceGroups) - validator, err := NewClusterValidator(cluster, mockcloud, &kopsapi.InstanceGroupList{Items: instanceGroups}, "https://api.testcluster.k8s.local", fake.NewSimpleClientset(objects...)) + restConfig := &rest.Config{ + Host: "https://api.testcluster.k8s.local", + } + validator, err := NewClusterValidator(cluster, mockcloud, &kopsapi.InstanceGroupList{Items: instanceGroups}, restConfig, fake.NewSimpleClientset(objects...)) if err != nil { return nil, err } @@ -156,7 +160,10 @@ func Test_ValidateCloudGroupMissing(t *testing.T) { mockcloud := BuildMockCloud(t, nil, cluster, instanceGroups) - validator, err := NewClusterValidator(cluster, mockcloud, &kopsapi.InstanceGroupList{Items: instanceGroups}, "https://api.testcluster.k8s.local", fake.NewSimpleClientset()) + restConfig := &rest.Config{ + Host: "https://api.testcluster.k8s.local", + } + validator, err := NewClusterValidator(cluster, mockcloud, &kopsapi.InstanceGroupList{Items: instanceGroups}, restConfig, fake.NewSimpleClientset()) require.NoError(t, err) v, err := validator.Validate() require.NoError(t, err)