Merge pull request #17154 from justinsb/build_our_own_rest_config

chore: refactor factory to accept a cluster
This commit is contained in:
Kubernetes Prow Robot 2024-12-28 09:08:12 +01:00 committed by GitHub
commit 51db52f025
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 367 additions and 187 deletions

View File

@ -17,20 +17,29 @@ limitations under the License.
package main
import (
"context"
"fmt"
"os"
"k8s.io/klog/v2"
"k8s.io/kops/channels/pkg/cmd"
"k8s.io/kops/cmd/kops/util"
)
func main() {
klog.InitFlags(nil)
f := util.NewFactory(nil)
if err := cmd.Execute(f, os.Stdout); err != nil {
if err := run(context.Background()); err != nil {
fmt.Fprintf(os.Stderr, "\n%v\n", err)
os.Exit(1)
}
}
func run(ctx context.Context) error {
klog.InitFlags(nil)
f := cmd.NewChannelsFactory()
if err := cmd.Execute(ctx, f, os.Stdout); err != nil {
return err
}
return nil
}

View File

@ -22,7 +22,7 @@ import (
"github.com/spf13/cobra"
)
func NewCmdApply(f Factory, out io.Writer) *cobra.Command {
func NewCmdApply(f *ChannelsFactory, out io.Writer) *cobra.Command {
cmd := &cobra.Command{
Use: "apply",
Short: "apply resources from a channel",

View File

@ -25,12 +25,15 @@ import (
"github.com/blang/semver/v4"
"github.com/cert-manager/cert-manager/pkg/client/clientset/versioned"
certmanager "github.com/cert-manager/cert-manager/pkg/client/clientset/versioned"
"github.com/spf13/cobra"
"go.uber.org/multierr"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/restmapper"
"k8s.io/kops/channels/pkg/channels"
"k8s.io/kops/util/pkg/tables"
"k8s.io/kops/util/pkg/vfs"
@ -38,9 +41,11 @@ import (
type ApplyChannelOptions struct {
Yes bool
configFlags genericclioptions.ConfigFlags
}
func NewCmdApplyChannel(f Factory, out io.Writer) *cobra.Command {
func NewCmdApplyChannel(f *ChannelsFactory, out io.Writer) *cobra.Command {
var options ApplyChannelOptions
cmd := &cobra.Command{
@ -57,20 +62,29 @@ func NewCmdApplyChannel(f Factory, out io.Writer) *cobra.Command {
return cmd
}
func RunApplyChannel(ctx context.Context, f Factory, out io.Writer, options *ApplyChannelOptions, args []string) error {
k8sClient, err := f.KubernetesClient()
func RunApplyChannel(ctx context.Context, f *ChannelsFactory, out io.Writer, options *ApplyChannelOptions, args []string) error {
restConfig, err := f.RESTConfig()
if err != nil {
return err
}
httpClient, err := f.HTTPClient()
if err != nil {
return err
}
cmClient, err := f.CertManagerClient()
k8sClient, err := kubernetes.NewForConfigAndClient(restConfig, httpClient)
if err != nil {
return err
return fmt.Errorf("building kube client: %w", err)
}
cmClient, err := certmanager.NewForConfigAndClient(restConfig, httpClient)
if err != nil {
return fmt.Errorf("building cert manager client: %w", err)
}
dynamicClient, err := f.DynamicClient()
if err != nil {
return err
return fmt.Errorf("building dynamic client: %w", err)
}
restMapper, err := f.RESTMapper()
@ -92,7 +106,7 @@ func RunApplyChannel(ctx context.Context, f Factory, out io.Writer, options *App
kubernetesVersion.Pre = nil
if len(args) != 1 {
return fmt.Errorf("unexpected number of arguments. Only one channel may be processed at the same time.")
return fmt.Errorf("unexpected number of arguments. Only one channel may be processed at the same time")
}
channelLocation := args[0]

View File

@ -17,20 +17,101 @@ limitations under the License.
package cmd
import (
"fmt"
"net/http"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
"k8s.io/kops/util/pkg/vfs"
_ "k8s.io/client-go/plugin/pkg/client/auth"
certmanager "github.com/cert-manager/cert-manager/pkg/client/clientset/versioned"
)
type Factory interface {
VFSContext() *vfs.VFSContext
KubernetesClient() (kubernetes.Interface, error)
CertManagerClient() (certmanager.Interface, error)
RESTMapper() (*restmapper.DeferredDiscoveryRESTMapper, error)
DynamicClient() (dynamic.Interface, error)
type ChannelsFactory struct {
configFlags genericclioptions.ConfigFlags
cachedRESTConfig *rest.Config
cachedHTTPClient *http.Client
vfsContext *vfs.VFSContext
restMapper *restmapper.DeferredDiscoveryRESTMapper
dynamicClient dynamic.Interface
}
func NewChannelsFactory() *ChannelsFactory {
return &ChannelsFactory{}
}
func (f *ChannelsFactory) RESTConfig() (*rest.Config, error) {
if f.cachedRESTConfig == nil {
clientGetter := genericclioptions.NewConfigFlags(true)
restConfig, err := clientGetter.ToRESTConfig()
if err != nil {
return nil, fmt.Errorf("cannot load kubecfg settings: %w", err)
}
restConfig.UserAgent = "kops"
restConfig.Burst = 50
restConfig.QPS = 20
f.cachedRESTConfig = restConfig
}
return f.cachedRESTConfig, nil
}
func (f *ChannelsFactory) HTTPClient() (*http.Client, error) {
if f.cachedHTTPClient == nil {
restConfig, err := f.RESTConfig()
if err != nil {
return nil, err
}
httpClient, err := rest.HTTPClientFor(restConfig)
if err != nil {
return nil, fmt.Errorf("getting http client: %w", err)
}
f.cachedHTTPClient = httpClient
}
return f.cachedHTTPClient, nil
}
func (f *ChannelsFactory) RESTMapper() (*restmapper.DeferredDiscoveryRESTMapper, error) {
if f.restMapper == nil {
discoveryClient, err := f.configFlags.ToDiscoveryClient()
if err != nil {
return nil, err
}
restMapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient)
f.restMapper = restMapper
}
return f.restMapper, nil
}
func (f *ChannelsFactory) DynamicClient() (dynamic.Interface, error) {
if f.dynamicClient == nil {
restConfig, err := f.RESTConfig()
if err != nil {
return nil, err
}
httpClient, err := f.HTTPClient()
if err != nil {
return nil, err
}
dynamicClient, err := dynamic.NewForConfigAndClient(restConfig, httpClient)
if err != nil {
return nil, err
}
f.dynamicClient = dynamicClient
}
return f.dynamicClient, nil
}
func (f *ChannelsFactory) VFSContext() *vfs.VFSContext {
if f.vfsContext == nil {
// TODO vfs.NewVFSContext()
f.vfsContext = vfs.Context
}
return f.vfsContext
}

View File

@ -22,7 +22,7 @@ import (
"github.com/spf13/cobra"
)
func NewCmdGet(f Factory, out io.Writer) *cobra.Command {
func NewCmdGet(f *ChannelsFactory, out io.Writer) *cobra.Command {
cmd := &cobra.Command{
Use: "get",
SuggestFor: []string{"list"},

View File

@ -41,13 +41,14 @@ import (
"github.com/spf13/cobra"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/kops/channels/pkg/channels"
"k8s.io/kops/util/pkg/tables"
)
type GetAddonsOptions struct{}
func NewCmdGetAddons(f Factory, out io.Writer) *cobra.Command {
func NewCmdGetAddons(f *ChannelsFactory, out io.Writer) *cobra.Command {
var options GetAddonsOptions
cmd := &cobra.Command{
@ -70,11 +71,20 @@ type addonInfo struct {
Namespace *v1.Namespace
}
func RunGetAddons(ctx context.Context, f Factory, out io.Writer, options *GetAddonsOptions) error {
k8sClient, err := f.KubernetesClient()
func RunGetAddons(ctx context.Context, f *ChannelsFactory, out io.Writer, options *GetAddonsOptions) error {
restConfig, err := f.RESTConfig()
if err != nil {
return err
}
httpClient, err := f.HTTPClient()
if err != nil {
return err
}
k8sClient, err := kubernetes.NewForConfigAndClient(restConfig, httpClient)
if err != nil {
return fmt.Errorf("building kube client: %w", err)
}
namespaces, err := k8sClient.CoreV1().Namespaces().List(ctx, metav1.ListOptions{})
if err != nil {

View File

@ -17,6 +17,7 @@ limitations under the License.
package cmd
import (
"context"
goflag "flag"
"fmt"
"io"
@ -29,17 +30,17 @@ type CmdRootOptions struct {
configFile string
}
func Execute(f Factory, out io.Writer) error {
func Execute(ctx context.Context, f *ChannelsFactory, out io.Writer) error {
cobra.OnInitialize(initConfig)
cmd := NewCmdRoot(f, out)
goflag.Set("logtostderr", "true")
goflag.CommandLine.Parse([]string{})
return cmd.Execute()
return cmd.ExecuteContext(ctx)
}
func NewCmdRoot(f Factory, out io.Writer) *cobra.Command {
func NewCmdRoot(f *ChannelsFactory, out io.Writer) *cobra.Command {
options := &CmdRootOptions{}
cmd := &cobra.Command{

View File

@ -30,6 +30,7 @@ import (
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/client-go/kubernetes"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/rest"
"k8s.io/kops/cmd/kops/util"
kopsapi "k8s.io/kops/pkg/apis/kops"
"k8s.io/kops/pkg/cloudinstances"
@ -163,11 +164,28 @@ func RunDeleteInstance(ctx context.Context, f *util.Factory, out io.Writer, opti
return err
}
var nodes []v1.Node
var k8sClient kubernetes.Interface
var host string
var restConfig *rest.Config
if !options.CloudOnly {
k8sClient, host, nodes, err = getNodes(ctx, cluster, true)
restConfig, err = f.RESTConfig(cluster)
if err != nil {
return fmt.Errorf("getting rest config: %w", err)
}
httpClient, err := f.HTTPClient(cluster)
if err != nil {
return fmt.Errorf("getting http client: %w", err)
}
k8sClient, err = kubernetes.NewForConfigAndClient(restConfig, httpClient)
if err != nil {
return fmt.Errorf("cannot build kube client: %w", err)
}
}
var nodes []v1.Node
if !options.CloudOnly {
nodes, err = getNodes(ctx, k8sClient, true)
if err != nil {
return err
}
@ -240,7 +258,7 @@ func RunDeleteInstance(ctx context.Context, f *util.Factory, out io.Writer, opti
var clusterValidator validation.ClusterValidator
if !options.CloudOnly {
clusterValidator, err = validation.NewClusterValidator(cluster, cloud, list, host, k8sClient)
clusterValidator, err = validation.NewClusterValidator(cluster, cloud, list, restConfig, k8sClient)
if err != nil {
return fmt.Errorf("cannot create cluster validator: %v", err)
}
@ -250,37 +268,22 @@ func RunDeleteInstance(ctx context.Context, f *util.Factory, out io.Writer, opti
return d.UpdateSingleInstance(ctx, cloudMember, options.Surge)
}
func getNodes(ctx context.Context, cluster *kopsapi.Cluster, verbose bool) (kubernetes.Interface, string, []v1.Node, error) {
func getNodes(ctx context.Context, kubeClient kubernetes.Interface, verbose bool) ([]v1.Node, error) {
var nodes []v1.Node
var k8sClient kubernetes.Interface
contextName := cluster.ObjectMeta.Name
clientGetter := genericclioptions.NewConfigFlags(true)
clientGetter.Context = &contextName
config, err := clientGetter.ToRESTConfig()
if err != nil {
return nil, "", nil, fmt.Errorf("cannot load kubecfg settings for %q: %v", contextName, err)
}
k8sClient, err = kubernetes.NewForConfig(config)
if err != nil {
return nil, "", nil, fmt.Errorf("cannot build kube client for %q: %v", contextName, err)
}
nodeList, err := k8sClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
nodeList, err := kubeClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
if verbose {
fmt.Fprintf(os.Stderr, "Unable to reach the kubernetes API.\n")
fmt.Fprintf(os.Stderr, "Use --cloudonly to do a deletion without confirming progress with the k8s API\n\n")
}
return nil, "", nil, fmt.Errorf("listing nodes in cluster: %v", err)
return nil, fmt.Errorf("listing nodes in cluster: %v", err)
}
if nodeList != nil {
nodes = nodeList.Items
}
return k8sClient, config.Host, nodes, nil
return nodes, nil
}
func deleteNodeMatch(cloudMember *cloudinstances.CloudInstance, options *DeleteInstanceOptions) bool {
@ -319,15 +322,25 @@ func completeInstanceOrNode(f commandutils.Factory, options *DeleteInstanceOptio
return completions, directive
}
var nodes []v1.Node
var err error
var kubeClient kubernetes.Interface
if !options.CloudOnly {
_, _, nodes, err = getNodes(ctx, cluster, false)
var err error
kubeClient, err = getKubeClientFromKubeconfig(ctx, cluster)
if err != nil {
cobra.CompErrorln(err.Error())
}
}
var nodes []v1.Node
if kubeClient != nil {
nodeList, err := kubeClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
cobra.CompErrorln(err.Error())
} else if nodeList != nil {
nodes = nodeList.Items
}
}
list, err := clientSet.InstanceGroupsFor(cluster).List(ctx, metav1.ListOptions{})
if err != nil {
return commandutils.CompletionError("listing instance groups", err)
@ -368,6 +381,26 @@ func completeInstanceOrNode(f commandutils.Factory, options *DeleteInstanceOptio
}
}
// getKubeClientFromKubeconfig returns a kubernetes client from the kubeconfig,
// assuming it has already been exported. This is not ideal, but is reasonable
// for command completion.
func getKubeClientFromKubeconfig(ctx context.Context, cluster *kopsapi.Cluster) (kubernetes.Interface, error) {
contextName := cluster.ObjectMeta.Name
clientGetter := genericclioptions.NewConfigFlags(true)
clientGetter.Context = &contextName
config, err := clientGetter.ToRESTConfig()
if err != nil {
return nil, fmt.Errorf("cannot load kubecfg settings for %q: %w", contextName, err)
}
k8sClient, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("cannot build kube client for %q: %w", contextName, err)
}
return k8sClient, nil
}
func appendInstance(completions []string, instance *cloudinstances.CloudInstance, longestGroup int) []string {
completion := instance.ID
if instance.CloudInstanceGroup.InstanceGroup != nil {

View File

@ -23,6 +23,7 @@ import (
"io"
"strings"
"k8s.io/client-go/kubernetes"
"k8s.io/kops/pkg/cloudinstances"
"k8s.io/kops/pkg/commands/commandutils"
"k8s.io/kubectl/pkg/util/i18n"
@ -31,10 +32,7 @@ import (
"k8s.io/klog/v2"
"k8s.io/client-go/kubernetes"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/kops/util/pkg/tables"
@ -101,11 +99,21 @@ func RunGetInstances(ctx context.Context, f *util.Factory, out io.Writer, option
return err
}
k8sClient, err := createK8sClient(cluster)
restConfig, err := f.RESTConfig(cluster)
if err != nil {
return err
}
httpClient, err := f.HTTPClient(cluster)
if err != nil {
return err
}
k8sClient, err := kubernetes.NewForConfigAndClient(restConfig, httpClient)
if err != nil {
return fmt.Errorf("building kubernetes client: %w", err)
}
nodeList, err := k8sClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
klog.Warningf("cannot list node names. Kubernetes API unavailable: %v", err)
@ -200,22 +208,6 @@ func instanceOutputTable(instances []*cloudinstances.CloudInstance, out io.Write
return t.Render(instances, out, columns...)
}
func createK8sClient(cluster *kops.Cluster) (*kubernetes.Clientset, error) {
contextName := cluster.ObjectMeta.Name
clientGetter := genericclioptions.NewConfigFlags(true)
clientGetter.Context = &contextName
config, err := clientGetter.ToRESTConfig()
if err != nil {
return nil, fmt.Errorf("cannot load kubecfg settings for %q: %v", contextName, err)
}
k8sClient, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("cannot build kubernetes api client for %q: %v", contextName, err)
}
return k8sClient, nil
}
func asRenderable(instances []*cloudinstances.CloudInstance) []*renderableCloudInstance {
arr := make([]*renderableCloudInstance, len(instances))
for i, ci := range instances {

View File

@ -30,7 +30,6 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/client-go/kubernetes"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/kops/cmd/kops/util"
@ -238,21 +237,22 @@ func RunRollingUpdateCluster(ctx context.Context, f *util.Factory, out io.Writer
return err
}
contextName := cluster.ObjectMeta.Name
clientGetter := genericclioptions.NewConfigFlags(true)
clientGetter.Context = &contextName
config, err := clientGetter.ToRESTConfig()
if err != nil {
return fmt.Errorf("cannot load kubecfg settings for %q: %v", contextName, err)
}
var nodes []v1.Node
var k8sClient kubernetes.Interface
if !options.CloudOnly {
k8sClient, err = kubernetes.NewForConfig(config)
restConfig, err := f.RESTConfig(cluster)
if err != nil {
return fmt.Errorf("cannot build kube client for %q: %v", contextName, err)
return fmt.Errorf("getting rest config: %w", err)
}
httpClient, err := f.HTTPClient(cluster)
if err != nil {
return fmt.Errorf("getting http client: %w", err)
}
k8sClient, err = kubernetes.NewForConfigAndClient(restConfig, httpClient)
if err != nil {
return fmt.Errorf("getting kubernetes client: %w", err)
}
nodeList, err := k8sClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
@ -448,7 +448,12 @@ func RunRollingUpdateCluster(ctx context.Context, f *util.Factory, out io.Writer
var clusterValidator validation.ClusterValidator
if !options.CloudOnly {
clusterValidator, err = validation.NewClusterValidator(cluster, cloud, list, config.Host, k8sClient)
restConfig, err := f.RESTConfig(cluster)
if err != nil {
return fmt.Errorf("getting rest config: %w", err)
}
clusterValidator, err = validation.NewClusterValidator(cluster, cloud, list, restConfig, k8sClient)
if err != nil {
return fmt.Errorf("cannot create cluster validator: %v", err)
}

View File

@ -17,11 +17,9 @@ limitations under the License.
package main
import (
"context"
"io"
channelscmd "k8s.io/kops/channels/pkg/cmd"
"k8s.io/kops/cmd/kops/util"
"github.com/spf13/cobra"
)
@ -34,8 +32,7 @@ func NewCmdToolboxAddons(out io.Writer) *cobra.Command {
SilenceUsage: true,
}
f := util.NewFactory(nil)
ctx := context.Background()
f := channelscmd.NewChannelsFactory()
// create subcommands
cmd.AddCommand(&cobra.Command{
@ -43,6 +40,7 @@ func NewCmdToolboxAddons(out io.Writer) *cobra.Command {
Short: "Applies updates from the given channel",
Example: "kops toolbox addons apply s3://<state_store>/<cluster_name>/addons/bootstrap-channel.yaml",
RunE: func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
return channelscmd.RunApplyChannel(ctx, f, out, &channelscmd.ApplyChannelOptions{}, args)
},
})
@ -50,6 +48,7 @@ func NewCmdToolboxAddons(out io.Writer) *cobra.Command {
Use: "list",
Short: "Lists installed addons",
RunE: func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
return channelscmd.RunGetAddons(ctx, f, out, &channelscmd.GetAddonsOptions{})
},
})

View File

@ -176,6 +176,7 @@ func RunToolboxDump(ctx context.Context, f commandutils.Factory, out io.Writer,
var nodes corev1.NodeList
// TODO: We should use the factory to get the kubeconfig
kubeConfig, err := clientGetter.ToRESTConfig()
if err != nil {
klog.Warningf("cannot load kubeconfig settings for %q: %v", contextName, err)

View File

@ -18,20 +18,20 @@ package util
import (
"fmt"
"net/http"
"net/url"
"strings"
"sync"
certmanager "github.com/cert-manager/cert-manager/pkg/client/clientset/versioned"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
channelscmd "k8s.io/kops/channels/pkg/cmd"
gceacls "k8s.io/kops/pkg/acls/gce"
"k8s.io/kops/pkg/apis/kops"
kopsclient "k8s.io/kops/pkg/client/clientset_generated/clientset"
"k8s.io/kops/pkg/client/simple"
"k8s.io/kops/pkg/client/simple/api"
@ -44,24 +44,36 @@ type FactoryOptions struct {
}
type Factory struct {
ConfigFlags genericclioptions.ConfigFlags
options *FactoryOptions
clientset simple.Clientset
options *FactoryOptions
clientset simple.Clientset
kubernetesClient kubernetes.Interface
certManagerClient certmanager.Interface
vfsContext *vfs.VFSContext
vfsContext *vfs.VFSContext
cachedRESTConfig *rest.Config
dynamicClient dynamic.Interface
restMapper *restmapper.DeferredDiscoveryRESTMapper
// mutex protects access to the clusters map
mutex sync.Mutex
// clusters holds REST connection configuration for connecting to clusters
clusters map[string]*clusterInfo
}
// clusterInfo holds REST connection configuration for connecting to a cluster
type clusterInfo struct {
clusterName string
cachedHTTPClient *http.Client
cachedRESTConfig *rest.Config
cachedDynamicClient dynamic.Interface
}
func NewFactory(options *FactoryOptions) *Factory {
gceacls.Register()
if options == nil {
options = &FactoryOptions{}
}
return &Factory{
options: options,
options: options,
clusters: make(map[string]*clusterInfo),
}
}
@ -143,14 +155,38 @@ func (f *Factory) KopsStateStore() string {
return f.options.RegistryPath
}
var _ channelscmd.Factory = &Factory{}
func (f *Factory) getClusterInfo(clusterName string) *clusterInfo {
f.mutex.Lock()
defer f.mutex.Unlock()
func (f *Factory) restConfig() (*rest.Config, error) {
if clusterInfo, ok := f.clusters[clusterName]; ok {
return clusterInfo
}
clusterInfo := &clusterInfo{}
f.clusters[clusterName] = clusterInfo
return clusterInfo
}
func (f *Factory) RESTConfig(cluster *kops.Cluster) (*rest.Config, error) {
clusterInfo := f.getClusterInfo(cluster.ObjectMeta.Name)
return clusterInfo.RESTConfig()
}
func (f *clusterInfo) RESTConfig() (*rest.Config, error) {
if f.cachedRESTConfig == nil {
restConfig, err := f.ConfigFlags.ToRESTConfig()
if err != nil {
return nil, fmt.Errorf("cannot load kubecfg settings: %w", err)
// Get the kubeconfig from the context
clientGetter := genericclioptions.NewConfigFlags(true)
if f.clusterName != "" {
contextName := f.clusterName
clientGetter.Context = &contextName
}
restConfig, err := clientGetter.ToRESTConfig()
if err != nil {
return nil, fmt.Errorf("loading kubecfg settings for %q: %w", f.clusterName, err)
}
restConfig.UserAgent = "kops"
restConfig.Burst = 50
restConfig.QPS = 20
@ -159,67 +195,51 @@ func (f *Factory) restConfig() (*rest.Config, error) {
return f.cachedRESTConfig, nil
}
func (f *Factory) KubernetesClient() (kubernetes.Interface, error) {
if f.kubernetesClient == nil {
restConfig, err := f.restConfig()
func (f *Factory) HTTPClient(cluster *kops.Cluster) (*http.Client, error) {
clusterInfo := f.getClusterInfo(cluster.ObjectMeta.Name)
return clusterInfo.HTTPClient()
}
func (f *clusterInfo) HTTPClient() (*http.Client, error) {
if f.cachedHTTPClient == nil {
restConfig, err := f.RESTConfig()
if err != nil {
return nil, err
}
k8sClient, err := kubernetes.NewForConfig(restConfig)
httpClient, err := rest.HTTPClientFor(restConfig)
if err != nil {
return nil, fmt.Errorf("cannot build kube client: %w", err)
return nil, fmt.Errorf("building http client: %w", err)
}
f.kubernetesClient = k8sClient
f.cachedHTTPClient = httpClient
}
return f.kubernetesClient, nil
return f.cachedHTTPClient, nil
}
func (f *Factory) DynamicClient() (dynamic.Interface, error) {
if f.dynamicClient == nil {
restConfig, err := f.restConfig()
if err != nil {
return nil, fmt.Errorf("cannot load kubecfg settings: %w", err)
}
dynamicClient, err := dynamic.NewForConfig(restConfig)
if err != nil {
return nil, fmt.Errorf("cannot build dynamicClient client: %v", err)
}
f.dynamicClient = dynamicClient
}
return f.dynamicClient, nil
// DynamicClient returns a dynamic client
func (f *Factory) DynamicClient(clusterName string) (dynamic.Interface, error) {
clusterInfo := f.getClusterInfo(clusterName)
return clusterInfo.DynamicClient()
}
func (f *Factory) CertManagerClient() (certmanager.Interface, error) {
if f.certManagerClient == nil {
restConfig, err := f.restConfig()
if err != nil {
return nil, err
}
certManagerClient, err := certmanager.NewForConfig(restConfig)
if err != nil {
return nil, fmt.Errorf("cannot build kube client: %v", err)
}
f.certManagerClient = certManagerClient
}
return f.certManagerClient, nil
}
func (f *Factory) RESTMapper() (*restmapper.DeferredDiscoveryRESTMapper, error) {
if f.restMapper == nil {
discoveryClient, err := f.ConfigFlags.ToDiscoveryClient()
func (f *clusterInfo) DynamicClient() (dynamic.Interface, error) {
if f.cachedDynamicClient == nil {
restConfig, err := f.RESTConfig()
if err != nil {
return nil, err
}
restMapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient)
httpClient, err := f.HTTPClient()
if err != nil {
return nil, err
}
f.restMapper = restMapper
dynamicClient, err := dynamic.NewForConfigAndClient(restConfig, httpClient)
if err != nil {
return nil, fmt.Errorf("building dynamic client: %w", err)
}
f.cachedDynamicClient = dynamicClient
}
return f.restMapper, nil
return f.cachedDynamicClient, nil
}
func (f *Factory) VFSContext() *vfs.VFSContext {

View File

@ -25,6 +25,7 @@ import (
"strings"
"time"
"k8s.io/client-go/kubernetes"
"k8s.io/kops/pkg/commands/commandutils"
"k8s.io/kops/upup/pkg/fi/cloudup"
"k8s.io/kubectl/pkg/util/i18n"
@ -33,8 +34,6 @@ import (
"github.com/spf13/cobra"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
"k8s.io/kops/cmd/kops/util"
kopsapi "k8s.io/kops/pkg/apis/kops"
@ -148,27 +147,37 @@ func RunValidateCluster(ctx context.Context, f *util.Factory, out io.Writer, opt
return nil, fmt.Errorf("no InstanceGroup objects found")
}
// TODO: Refactor into util.Factory
contextName := cluster.ObjectMeta.Name
configLoadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
if options.kubeconfig != "" {
configLoadingRules.ExplicitPath = options.kubeconfig
}
config, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
configLoadingRules,
&clientcmd.ConfigOverrides{CurrentContext: contextName}).ClientConfig()
// // TODO: Refactor into util.Factory
// contextName := cluster.ObjectMeta.Name
// configLoadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
// if options.kubeconfig != "" {
// configLoadingRules.ExplicitPath = options.kubeconfig
// }
// config, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
// configLoadingRules,
// &clientcmd.ConfigOverrides{CurrentContext: contextName}).ClientConfig()
// if err != nil {
// return nil, fmt.Errorf("cannot load kubecfg settings for %q: %v", contextName, err)
// }
restConfig, err := f.RESTConfig(cluster)
if err != nil {
return nil, fmt.Errorf("cannot load kubecfg settings for %q: %v", contextName, err)
return nil, fmt.Errorf("getting rest config: %w", err)
}
k8sClient, err := kubernetes.NewForConfig(config)
httpClient, err := f.HTTPClient(cluster)
if err != nil {
return nil, fmt.Errorf("cannot build kubernetes api client for %q: %v", contextName, err)
return nil, fmt.Errorf("getting http client: %w", err)
}
k8sClient, err := kubernetes.NewForConfigAndClient(restConfig, httpClient)
if err != nil {
return nil, fmt.Errorf("building kubernetes client: %w", err)
}
timeout := time.Now().Add(options.wait)
validator, err := validation.NewClusterValidator(cluster, cloud, list, config.Host, k8sClient)
validator, err := validation.NewClusterValidator(cluster, cloud, list, restConfig, k8sClient)
if err != nil {
return nil, fmt.Errorf("unexpected error creating validatior: %v", err)
}

View File

@ -17,6 +17,9 @@ limitations under the License.
package commandutils
import (
"k8s.io/client-go/rest"
"k8s.io/kops/pkg/apis/kops"
"k8s.io/kops/pkg/client/simple"
"k8s.io/kops/util/pkg/vfs"
)
@ -24,4 +27,5 @@ import (
type Factory interface {
KopsClient() (simple.Clientset, error)
VFSContext() *vfs.VFSContext
RESTConfig(cluster *kops.Cluster) (*rest.Config, error)
}

View File

@ -37,7 +37,6 @@ import (
"golang.org/x/crypto/ssh/agent"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
@ -110,14 +109,9 @@ func RunToolboxEnroll(ctx context.Context, f commandutils.Factory, out io.Writer
// Enroll the node over SSH.
if options.Host != "" {
// TODO: This is the pattern we use a lot, but should we try to access it directly?
contextName := fullCluster.ObjectMeta.Name
clientGetter := genericclioptions.NewConfigFlags(true)
clientGetter.Context = &contextName
restConfig, err := clientGetter.ToRESTConfig()
restConfig, err := f.RESTConfig(fullCluster)
if err != nil {
return fmt.Errorf("cannot load kubecfg settings for %q: %w", contextName, err)
return err
}
if err := enrollHost(ctx, fullInstanceGroup, options, bootstrapData, restConfig); err != nil {

View File

@ -25,11 +25,11 @@ import (
"time"
"k8s.io/apimachinery/pkg/util/errors"
"k8s.io/kops/pkg/client/simple"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
api "k8s.io/kops/pkg/apis/kops"
"k8s.io/kops/pkg/client/simple"
"k8s.io/kops/pkg/cloudinstances"
"k8s.io/kops/pkg/validation"
"k8s.io/kops/upup/pkg/fi"

View File

@ -25,6 +25,7 @@ import (
"strings"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/pager"
"k8s.io/kops/pkg/apis/kops"
"k8s.io/kops/upup/pkg/fi"
@ -62,7 +63,7 @@ type clusterValidatorImpl struct {
cluster *kops.Cluster
cloud fi.Cloud
instanceGroups []*kops.InstanceGroup
host string
restConfig *rest.Config
k8sClient kubernetes.Interface
}
@ -100,7 +101,7 @@ func hasPlaceHolderIP(host string) (string, error) {
return "", nil
}
func NewClusterValidator(cluster *kops.Cluster, cloud fi.Cloud, instanceGroupList *kops.InstanceGroupList, host string, k8sClient kubernetes.Interface) (ClusterValidator, error) {
func NewClusterValidator(cluster *kops.Cluster, cloud fi.Cloud, instanceGroupList *kops.InstanceGroupList, restConfig *rest.Config, k8sClient kubernetes.Interface) (ClusterValidator, error) {
var instanceGroups []*kops.InstanceGroup
for i := range instanceGroupList.Items {
@ -116,7 +117,7 @@ func NewClusterValidator(cluster *kops.Cluster, cloud fi.Cloud, instanceGroupLis
cluster: cluster,
cloud: cloud,
instanceGroups: instanceGroups,
host: host,
restConfig: restConfig,
k8sClient: k8sClient,
}, nil
}
@ -133,7 +134,7 @@ func (v *clusterValidatorImpl) Validate() (*ValidationCluster, error) {
dnsProvider = kops.ExternalDNSProviderExternalDNS
}
hasPlaceHolderIPAddress, err := hasPlaceHolderIP(v.host)
hasPlaceHolderIPAddress, err := hasPlaceHolderIP(v.restConfig.Host)
if err != nil {
return nil, err
}

View File

@ -26,6 +26,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/rest"
kopsapi "k8s.io/kops/pkg/apis/kops"
"k8s.io/kops/pkg/cloudinstances"
"k8s.io/kops/upup/pkg/fi"
@ -126,7 +127,10 @@ func testValidate(t *testing.T, groups map[string]*cloudinstances.CloudInstanceG
mockcloud := BuildMockCloud(t, groups, cluster, instanceGroups)
validator, err := NewClusterValidator(cluster, mockcloud, &kopsapi.InstanceGroupList{Items: instanceGroups}, "https://api.testcluster.k8s.local", fake.NewSimpleClientset(objects...))
restConfig := &rest.Config{
Host: "https://api.testcluster.k8s.local",
}
validator, err := NewClusterValidator(cluster, mockcloud, &kopsapi.InstanceGroupList{Items: instanceGroups}, restConfig, fake.NewSimpleClientset(objects...))
if err != nil {
return nil, err
}
@ -156,7 +160,10 @@ func Test_ValidateCloudGroupMissing(t *testing.T) {
mockcloud := BuildMockCloud(t, nil, cluster, instanceGroups)
validator, err := NewClusterValidator(cluster, mockcloud, &kopsapi.InstanceGroupList{Items: instanceGroups}, "https://api.testcluster.k8s.local", fake.NewSimpleClientset())
restConfig := &rest.Config{
Host: "https://api.testcluster.k8s.local",
}
validator, err := NewClusterValidator(cluster, mockcloud, &kopsapi.InstanceGroupList{Items: instanceGroups}, restConfig, fake.NewSimpleClientset())
require.NoError(t, err)
v, err := validator.Validate()
require.NoError(t, err)