Merge pull request #16196 from rifelpet/dumpk8s

Parallelize k8s resource dumps with kops toolbox dump
This commit is contained in:
Kubernetes Prow Robot 2023-12-31 09:27:31 +01:00 committed by GitHub
commit e37355cc73
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 284 additions and 7 deletions

View File

@ -56,6 +56,8 @@ var (
`))
toolboxDumpShort = i18n.T(`Dump cluster information`)
k8sResources = os.Getenv("KOPS_TOOLBOX_DUMP_K8S_RESOURCES")
)
type ToolboxDumpOptions struct {
@ -63,10 +65,11 @@ type ToolboxDumpOptions struct {
ClusterName string
Dir string
PrivateKey string
SSHUser string
MaxNodes int
Dir string
PrivateKey string
SSHUser string
MaxNodes int
K8sResources bool
}
func (o *ToolboxDumpOptions) InitDefaults() {
@ -74,6 +77,7 @@ func (o *ToolboxDumpOptions) InitDefaults() {
o.PrivateKey = "~/.ssh/id_rsa"
o.SSHUser = "ubuntu"
o.MaxNodes = 500
o.K8sResources = k8sResources != ""
}
func NewCmdToolboxDump(f commandutils.Factory, out io.Writer) *cobra.Command {
@ -99,6 +103,7 @@ func NewCmdToolboxDump(f commandutils.Factory, out io.Writer) *cobra.Command {
cmd.Flags().StringVar(&options.Dir, "dir", options.Dir, "Target directory; if specified will collect logs and other information.")
cmd.MarkFlagDirname("dir")
cmd.Flags().BoolVar(&options.K8sResources, "k8s-resources", options.K8sResources, "Include k8s resources in the dump")
cmd.Flags().IntVar(&options.MaxNodes, "max-nodes", options.MaxNodes, "The maximum number of nodes from which to dump logs")
cmd.Flags().StringVar(&options.PrivateKey, "private-key", options.PrivateKey, "File containing private key to use for SSH access to instances")
cmd.Flags().StringVar(&options.SSHUser, "ssh-user", options.SSHUser, "The remote user for SSH access to instances")
@ -222,6 +227,15 @@ func RunToolboxDump(ctx context.Context, f commandutils.Factory, out io.Writer,
if err := dumper.DumpAllNodes(ctx, nodes, additionalIPs, additionalPrivateIPs); err != nil {
return fmt.Errorf("error dumping nodes: %v", err)
}
if options.K8sResources {
dumper, err := dump.NewResourceDumper("docker-desktop", config, options.Output, options.Dir)
if err != nil {
return fmt.Errorf("error creating resource dumper: %w", err)
}
if err := dumper.DumpResources(ctx); err != nil {
return fmt.Errorf("error dumping resources: %w", err)
}
}
}
switch options.Output {

View File

@ -25,6 +25,7 @@ kops toolbox dump [CLUSTER] [flags]
```
--dir string Target directory; if specified will collect logs and other information.
-h, --help help for dump
--k8s-resources Include k8s resources in the dump
--max-nodes int The maximum number of nodes from which to dump logs (default 500)
-o, --output string Output format. One of json or yaml (default "yaml")
--private-key string File containing private key to use for SSH access to instances (default "~/.ssh/id_rsa")

253
pkg/dump/resourcedumper.go Normal file
View File

@ -0,0 +1,253 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dump
import (
"context"
"errors"
"fmt"
"os"
"path"
"slices"
v1 "k8s.io/api/core/v1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
"sigs.k8s.io/yaml"
)
const (
resourceDumpConcurrency = 20
)
var (
ignoredResources = map[string]struct{}{
"componentstatuses": {},
"podtemplates": {},
"replicationcontrollers": {},
"secrets": {}, // Avoid leaking secrets
"controllerrevisions": {},
}
)
type gvrNamespace struct {
namespace string
gvr schema.GroupVersionResource
}
func (d *gvrNamespace) String() string {
return path.Join(d.namespace, d.gvr.Resource)
}
type resourceDumper struct {
k8sConfig *rest.Config
dynamicClient *dynamic.DynamicClient
output string
artifactsDir string
}
type resourceDumpResult struct {
err error
}
func NewResourceDumper(clusterName string, k8sConfig *rest.Config, output, artifactsDir string) (*resourceDumper, error) {
k8sConfig.QPS = 50
k8sConfig.Burst = 100
dynamicClient, err := dynamic.NewForConfig(k8sConfig)
if err != nil {
return nil, fmt.Errorf("creating dynamic client: %w", err)
}
return &resourceDumper{
k8sConfig: k8sConfig,
dynamicClient: dynamicClient,
output: output,
artifactsDir: artifactsDir,
}, nil
}
func (d *resourceDumper) DumpResources(ctx context.Context) error {
klog.Info("Dumping k8s resources")
clientSet, err := kubernetes.NewForConfig(d.k8sConfig)
if err != nil {
return fmt.Errorf("creating clientset: %w", err)
}
namespaces, err := clientSet.CoreV1().Namespaces().List(ctx, metav1.ListOptions{})
if err != nil {
return fmt.Errorf("listing namespaces: %w", err)
}
discoveryClient, err := discovery.NewDiscoveryClientForConfig(d.k8sConfig)
if err != nil {
return fmt.Errorf("creating discovery client: %w", err)
}
resourceLists, err := discoveryClient.ServerPreferredResources()
if err != nil {
return fmt.Errorf("listing server preferred resources: %w", err)
}
gvrNamespaces, err := getGVRNamespaces(resourceLists, namespaces.Items)
if err != nil {
return fmt.Errorf("getting GVR namespaces: %w", err)
}
jobs := make(chan gvrNamespace, len(gvrNamespaces))
results := make(chan resourceDumpResult, len(gvrNamespaces))
for i := 0; i < resourceDumpConcurrency; i++ {
go d.dumpGVRNamespaces(ctx, jobs, results)
}
var dumpErr error
for _, gvrn := range gvrNamespaces {
jobs <- gvrn
}
close(jobs)
for i := 0; i < len(gvrNamespaces); i++ {
result := <-results
if result.err != nil {
errors.Join(dumpErr, result.err)
}
}
close(results)
return dumpErr
}
func getGVRNamespaces(resourceLists []*metav1.APIResourceList, namespaces []v1.Namespace) ([]gvrNamespace, error) {
gvrNamespaces := make([]gvrNamespace, 0)
for _, resourceList := range resourceLists {
gv, err := schema.ParseGroupVersion(resourceList.GroupVersion)
if err != nil {
return nil, err
}
for _, apiResource := range resourceList.APIResources {
if _, ok := ignoredResources[apiResource.Name]; ok || !slices.Contains(apiResource.Verbs, "list") {
continue
}
if apiResource.Namespaced {
for _, ns := range namespaces {
gvrNamespaces = append(gvrNamespaces, gvrNamespace{
gvr: schema.GroupVersionResource{
Group: gv.Group,
Version: gv.Version,
Resource: apiResource.Name,
},
namespace: ns.Name,
})
}
} else {
gvrNamespaces = append(gvrNamespaces, gvrNamespace{
gvr: schema.GroupVersionResource{
Group: gv.Group,
Version: gv.Version,
Resource: apiResource.Name,
},
})
}
}
}
return gvrNamespaces, nil
}
func (d *resourceDumper) dumpGVRNamespaces(ctx context.Context, jobs chan gvrNamespace, results chan resourceDumpResult) {
for job := range jobs {
var lister dynamic.ResourceInterface
if job.namespace != "" {
lister = d.dynamicClient.Resource(job.gvr).Namespace(job.namespace)
} else {
lister = d.dynamicClient.Resource(job.gvr)
}
resourceList, err := lister.List(ctx, metav1.ListOptions{})
if err != nil {
var statusErr *k8sErrors.StatusError
if errors.As(err, &statusErr) && statusErr.ErrStatus.Code >= 400 && statusErr.ErrStatus.Code < 500 {
continue
}
results <- resourceDumpResult{
err: fmt.Errorf("listing resources for %v: %w", job, err),
}
continue
}
resPath := path.Join(d.artifactsDir, "cluster-info", fmt.Sprintf("%v.%v", job.String(), d.output))
err = os.MkdirAll(path.Dir(resPath), 0755)
if err != nil {
results <- resourceDumpResult{
err: fmt.Errorf("creating directory %q: %w", resPath, err),
}
continue
}
resFile, err := os.Create(resPath)
if err != nil {
results <- resourceDumpResult{
err: fmt.Errorf("creating file %q: %w", resPath, err),
}
continue
}
err = resourceList.EachListItem(func(obj runtime.Object) error {
o, err := meta.Accessor(obj)
if err != nil {
return err
}
o.SetManagedFields(nil)
return nil
})
if err != nil {
results <- resourceDumpResult{
err: fmt.Errorf("creating accessor for %v: %w", job, err),
}
continue
}
contents, err := resourceList.MarshalJSON()
if err != nil {
results <- resourceDumpResult{
err: fmt.Errorf("marshaling to json for %v: %w", job, err),
}
continue
}
switch d.output {
case "yaml":
contents, err = yaml.JSONToYAML(contents)
if err != nil {
results <- resourceDumpResult{
err: fmt.Errorf("marshaling to yaml for %v: %w", job, err),
}
continue
}
}
_, err = resFile.Write(contents)
if err != nil {
results <- resourceDumpResult{
err: fmt.Errorf("encoding resources for %v: %w", job, err),
}
continue
}
results <- resourceDumpResult{}
}
}

View File

@ -25,6 +25,7 @@ import (
"k8s.io/klog/v2"
"k8s.io/kops/pkg/resources"
"k8s.io/kops/tests/e2e/pkg/kops"
"sigs.k8s.io/kubetest2/pkg/exec"
"sigs.k8s.io/yaml"
)
@ -45,7 +46,7 @@ func (d *deployer) DumpClusterLogs() error {
}
klog.Info(strings.Join(args, " "))
cmd := exec.Command(args[0], args[1:]...)
cmd.SetEnv(d.env()...)
cmd.SetEnv(append(d.env(), "KOPS_TOOLBOX_DUMP_K8S_RESOURCES=1")...)
cmd.SetStdout(yamlFile)
cmd.SetStderr(os.Stderr)
@ -60,11 +61,19 @@ func (d *deployer) DumpClusterLogs() error {
dumpErr = errors.Join(dumpErr, err)
}
if err := d.dumpClusterInfo(); err != nil {
klog.Warningf("cluster info dump failed: %v", err)
kopsVersion, err := kops.GetVersion(d.KopsBinaryPath)
if err != nil {
klog.Warningf("kops version failed: %v", err)
dumpErr = errors.Join(dumpErr, err)
}
if kopsVersion == "" || kopsVersion < "1.29" {
// TODO: remove when kubetest2-kops stops testing against kops 1.28 and older
if err := d.dumpClusterInfo(); err != nil {
klog.Warningf("cluster info dump failed: %v", err)
dumpErr = errors.Join(dumpErr, err)
}
}
return dumpErr
}