mirror of https://github.com/kubernetes/kops.git
Capture logs from a kops cluster
This is derived from the dumping code in kubetest. If we want to run tests outside of kubetest (e.g. upgrade tests), we're going to need that functionality.
This commit is contained in:
parent
9d675e3f5c
commit
d8b69ab2e3
|
@ -66,6 +66,7 @@ go_library(
|
||||||
"//pkg/client/simple:go_default_library",
|
"//pkg/client/simple:go_default_library",
|
||||||
"//pkg/cloudinstances:go_default_library",
|
"//pkg/cloudinstances:go_default_library",
|
||||||
"//pkg/commands:go_default_library",
|
"//pkg/commands:go_default_library",
|
||||||
|
"//pkg/dump:go_default_library",
|
||||||
"//pkg/edit:go_default_library",
|
"//pkg/edit:go_default_library",
|
||||||
"//pkg/featureflag:go_default_library",
|
"//pkg/featureflag:go_default_library",
|
||||||
"//pkg/formatter:go_default_library",
|
"//pkg/formatter:go_default_library",
|
||||||
|
@ -94,6 +95,7 @@ go_library(
|
||||||
"//vendor/github.com/spf13/cobra:go_default_library",
|
"//vendor/github.com/spf13/cobra:go_default_library",
|
||||||
"//vendor/github.com/spf13/cobra/doc:go_default_library",
|
"//vendor/github.com/spf13/cobra/doc:go_default_library",
|
||||||
"//vendor/github.com/spf13/viper:go_default_library",
|
"//vendor/github.com/spf13/viper:go_default_library",
|
||||||
|
"//vendor/golang.org/x/crypto/ssh:go_default_library",
|
||||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
|
|
|
@ -21,10 +21,18 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
|
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
|
"golang.org/x/crypto/ssh"
|
||||||
|
corev1 "k8s.io/api/core/v1"
|
||||||
|
"k8s.io/klog"
|
||||||
"k8s.io/kops/cmd/kops/util"
|
"k8s.io/kops/cmd/kops/util"
|
||||||
"k8s.io/kops/pkg/apis/kops"
|
"k8s.io/kops/pkg/apis/kops"
|
||||||
|
"k8s.io/kops/pkg/dump"
|
||||||
"k8s.io/kops/pkg/resources"
|
"k8s.io/kops/pkg/resources"
|
||||||
resourceops "k8s.io/kops/pkg/resources/ops"
|
resourceops "k8s.io/kops/pkg/resources/ops"
|
||||||
"k8s.io/kops/upup/pkg/fi/cloudup"
|
"k8s.io/kops/upup/pkg/fi/cloudup"
|
||||||
|
@ -48,6 +56,8 @@ type ToolboxDumpOptions struct {
|
||||||
Output string
|
Output string
|
||||||
|
|
||||||
ClusterName string
|
ClusterName string
|
||||||
|
|
||||||
|
Dir string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (o *ToolboxDumpOptions) InitDefaults() {
|
func (o *ToolboxDumpOptions) InitDefaults() {
|
||||||
|
@ -83,10 +93,13 @@ func NewCmdToolboxDump(f *util.Factory, out io.Writer) *cobra.Command {
|
||||||
// Yes please! (@kris-nova)
|
// Yes please! (@kris-nova)
|
||||||
cmd.Flags().StringVarP(&options.Output, "output", "o", options.Output, "output format. One of: yaml, json")
|
cmd.Flags().StringVarP(&options.Output, "output", "o", options.Output, "output format. One of: yaml, json")
|
||||||
|
|
||||||
|
cmd.Flags().StringVar(&options.Dir, "dir", options.Dir, "target directory; if specified will collect logs and other information.")
|
||||||
|
|
||||||
return cmd
|
return cmd
|
||||||
}
|
}
|
||||||
|
|
||||||
func RunToolboxDump(ctx context.Context, f *util.Factory, out io.Writer, options *ToolboxDumpOptions) error {
|
func RunToolboxDump(ctx context.Context, f *util.Factory, out io.Writer, options *ToolboxDumpOptions) error {
|
||||||
|
|
||||||
clientset, err := f.Clientset()
|
clientset, err := f.Clientset()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -115,14 +128,60 @@ func RunToolboxDump(ctx context.Context, f *util.Factory, out io.Writer, options
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
dump, err := resources.BuildDump(context.TODO(), cloud, resourceMap)
|
d, err := resources.BuildDump(ctx, cloud, resourceMap)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if options.Dir != "" {
|
||||||
|
// TODO: Flag?
|
||||||
|
privateKeyPath := "~/.ssh/id_rsa"
|
||||||
|
if strings.HasPrefix(privateKeyPath, "~/") {
|
||||||
|
privateKeyPath = filepath.Join(os.Getenv("HOME"), privateKeyPath[2:])
|
||||||
|
}
|
||||||
|
key, err := ioutil.ReadFile(privateKeyPath)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error reading private key %q: %v", privateKeyPath, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
signer, err := ssh.ParsePrivateKey(key)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error parsing private key %q: %v", privateKeyPath, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: We need to find the correct SSH user, ideally per IP
|
||||||
|
sshUser := "admin"
|
||||||
|
sshConfig := &ssh.ClientConfig{
|
||||||
|
User: sshUser,
|
||||||
|
Auth: []ssh.AuthMethod{
|
||||||
|
ssh.PublicKeys(signer),
|
||||||
|
},
|
||||||
|
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
|
||||||
|
}
|
||||||
|
|
||||||
|
dumper := dump.NewLogDumper(sshConfig, options.Dir)
|
||||||
|
|
||||||
|
// TODO: Should we try to get nodes?
|
||||||
|
var nodes corev1.NodeList
|
||||||
|
|
||||||
|
var additionalIPs []string
|
||||||
|
for _, instance := range d.Instances {
|
||||||
|
if len(instance.PublicAddresses) != 0 {
|
||||||
|
additionalIPs = append(additionalIPs, instance.PublicAddresses[0])
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
klog.Warningf("no public IP for node %q", instance.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := dumper.DumpAllNodes(ctx, nodes, additionalIPs); err != nil {
|
||||||
|
return fmt.Errorf("error dumping nodes: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
switch options.Output {
|
switch options.Output {
|
||||||
case OutputYaml:
|
case OutputYaml:
|
||||||
b, err := kops.ToRawYaml(dump)
|
b, err := kops.ToRawYaml(d)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error marshaling yaml: %v", err)
|
return fmt.Errorf("error marshaling yaml: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -133,7 +192,7 @@ func RunToolboxDump(ctx context.Context, f *util.Factory, out io.Writer, options
|
||||||
return nil
|
return nil
|
||||||
|
|
||||||
case OutputJSON:
|
case OutputJSON:
|
||||||
b, err := json.MarshalIndent(dump, "", " ")
|
b, err := json.MarshalIndent(d, "", " ")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error marshaling json: %v", err)
|
return fmt.Errorf("error marshaling json: %v", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,13 @@
|
||||||
|
load("@io_bazel_rules_go//go:def.bzl", "go_library")
|
||||||
|
|
||||||
|
go_library(
|
||||||
|
name = "go_default_library",
|
||||||
|
srcs = ["dumper.go"],
|
||||||
|
importpath = "k8s.io/kops/pkg/dump",
|
||||||
|
visibility = ["//visibility:public"],
|
||||||
|
deps = [
|
||||||
|
"//vendor/golang.org/x/crypto/ssh:go_default_library",
|
||||||
|
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||||
|
"//vendor/k8s.io/klog:go_default_library",
|
||||||
|
],
|
||||||
|
)
|
|
@ -0,0 +1,430 @@
|
||||||
|
/*
|
||||||
|
Copyright 2017 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package dump
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"golang.org/x/crypto/ssh"
|
||||||
|
"io"
|
||||||
|
corev1 "k8s.io/api/core/v1"
|
||||||
|
"k8s.io/klog"
|
||||||
|
"log"
|
||||||
|
"net"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
// logDumper gets all the nodes from a kubernetes cluster and dumps a well-known set of logs
|
||||||
|
type logDumper struct {
|
||||||
|
sshClientFactory sshClientFactory
|
||||||
|
|
||||||
|
artifactsDir string
|
||||||
|
|
||||||
|
services []string
|
||||||
|
files []string
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewLogDumper is the constructor for a logDumper
|
||||||
|
func NewLogDumper(sshConfig *ssh.ClientConfig, artifactsDir string) *logDumper {
|
||||||
|
sshClientFactory := &sshClientFactoryImplementation{
|
||||||
|
sshConfig: sshConfig,
|
||||||
|
}
|
||||||
|
|
||||||
|
d := &logDumper{
|
||||||
|
sshClientFactory: sshClientFactory,
|
||||||
|
artifactsDir: artifactsDir,
|
||||||
|
}
|
||||||
|
|
||||||
|
d.services = []string{
|
||||||
|
"node-problem-detector",
|
||||||
|
"kubelet",
|
||||||
|
"docker",
|
||||||
|
"kops-configuration",
|
||||||
|
"protokube",
|
||||||
|
}
|
||||||
|
d.files = []string{
|
||||||
|
"kube-apiserver",
|
||||||
|
"kube-scheduler",
|
||||||
|
"rescheduler",
|
||||||
|
"kube-controller-manager",
|
||||||
|
"etcd",
|
||||||
|
"etcd-events",
|
||||||
|
"glbc",
|
||||||
|
"cluster-autoscaler",
|
||||||
|
"kube-addon-manager",
|
||||||
|
"fluentd",
|
||||||
|
"kube-proxy",
|
||||||
|
"node-problem-detector",
|
||||||
|
"cloud-init-output",
|
||||||
|
"startupscript",
|
||||||
|
"kern",
|
||||||
|
"docker",
|
||||||
|
}
|
||||||
|
|
||||||
|
return d
|
||||||
|
}
|
||||||
|
|
||||||
|
// DumpAllNodes connects to every node from kubectl get nodes and dumps the logs.
|
||||||
|
// additionalIPs holds IP addresses of instances found by the deployment tool;
|
||||||
|
// if the IPs are not found from kubectl get nodes, then these will be dumped also.
|
||||||
|
// This allows for dumping log on nodes even if they don't register as a kubernetes
|
||||||
|
// node, or if a node fails to register, or if the whole cluster fails to start.
|
||||||
|
func (d *logDumper) DumpAllNodes(ctx context.Context, nodes corev1.NodeList, additionalIPs []string) error {
|
||||||
|
var dumped []*corev1.Node
|
||||||
|
|
||||||
|
for i := range nodes.Items {
|
||||||
|
if ctx.Err() != nil {
|
||||||
|
log.Printf("stopping dumping nodes: %v", ctx.Err())
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
node := &nodes.Items[i]
|
||||||
|
|
||||||
|
ip := ""
|
||||||
|
for _, address := range node.Status.Addresses {
|
||||||
|
if address.Type == "ExternalIP" {
|
||||||
|
ip = address.Address
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
err := d.dumpNode(ctx, node.Name, ip)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("could not dump node %s (%s): %v", node.Name, ip, err)
|
||||||
|
} else {
|
||||||
|
dumped = append(dumped, node)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
notDumped := findInstancesNotDumped(additionalIPs, dumped)
|
||||||
|
for _, ip := range notDumped {
|
||||||
|
if ctx.Err() != nil {
|
||||||
|
log.Printf("stopping dumping nodes: %v", ctx.Err())
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("dumping node not registered in kubernetes: %s", ip)
|
||||||
|
err := d.dumpNode(ctx, ip, ip)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("error dumping node %s: %v", ip, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// findInstancesNotDumped returns ips from the slice that do not appear as any address of the nodes
|
||||||
|
func findInstancesNotDumped(ips []string, dumped []*corev1.Node) []string {
|
||||||
|
var notDumped []string
|
||||||
|
dumpedAddresses := make(map[string]bool)
|
||||||
|
for _, node := range dumped {
|
||||||
|
for _, address := range node.Status.Addresses {
|
||||||
|
dumpedAddresses[address.Address] = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, ip := range ips {
|
||||||
|
if !dumpedAddresses[ip] {
|
||||||
|
notDumped = append(notDumped, ip)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return notDumped
|
||||||
|
}
|
||||||
|
|
||||||
|
// DumpNode connects to a node and dumps the logs.
|
||||||
|
func (d *logDumper) dumpNode(ctx context.Context, name string, ip string) error {
|
||||||
|
if ip == "" {
|
||||||
|
return fmt.Errorf("could not find address for %v, ", name)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("Dumping node %s", name)
|
||||||
|
|
||||||
|
n, err := d.connectToNode(ctx, name, ip)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not connect: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// As long as we connect to the node we will not return an error;
|
||||||
|
// a failure to collect a log (or even any logs at all) is not
|
||||||
|
// considered an error in dumping the node.
|
||||||
|
// TODO(justinsb): clean up / rationalize
|
||||||
|
errors := n.dump(ctx)
|
||||||
|
for _, e := range errors {
|
||||||
|
log.Printf("error dumping node %s: %v", name, e)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := n.Close(); err != nil {
|
||||||
|
log.Printf("error closing connection: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// sshClient is an interface abstracting *ssh.Client, which allows us to test it
|
||||||
|
type sshClient interface {
|
||||||
|
io.Closer
|
||||||
|
|
||||||
|
// ExecPiped runs the command, piping stdout & stderr
|
||||||
|
ExecPiped(ctx context.Context, command string, stdout io.Writer, stderr io.Writer) error
|
||||||
|
}
|
||||||
|
|
||||||
|
// sshClientFactory is an interface abstracting to a node over SSH
|
||||||
|
type sshClientFactory interface {
|
||||||
|
Dial(ctx context.Context, host string) (sshClient, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// logDumperNode holds state for a particular node we are dumping
|
||||||
|
type logDumperNode struct {
|
||||||
|
client sshClient
|
||||||
|
dumper *logDumper
|
||||||
|
|
||||||
|
dir string
|
||||||
|
}
|
||||||
|
|
||||||
|
// connectToNode makes an SSH connection to the node and returns a logDumperNode
|
||||||
|
func (d *logDumper) connectToNode(ctx context.Context, nodeName string, host string) (*logDumperNode, error) {
|
||||||
|
client, err := d.sshClientFactory.Dial(ctx, host)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("unable to SSH to %q: %v", host, err)
|
||||||
|
}
|
||||||
|
return &logDumperNode{
|
||||||
|
client: client,
|
||||||
|
dir: filepath.Join(d.artifactsDir, nodeName),
|
||||||
|
dumper: d,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// logDumperNode cleans up any state in the logDumperNode
|
||||||
|
func (n *logDumperNode) Close() error {
|
||||||
|
return n.client.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
// dump captures the well-known set of logs
|
||||||
|
func (n *logDumperNode) dump(ctx context.Context) []error {
|
||||||
|
if ctx.Err() != nil {
|
||||||
|
return []error{ctx.Err()}
|
||||||
|
}
|
||||||
|
|
||||||
|
var errors []error
|
||||||
|
|
||||||
|
// Capture kernel log
|
||||||
|
if err := n.shellToFile(ctx, "sudo journalctl --output=short-precise -k", filepath.Join(n.dir, "kern.log")); err != nil {
|
||||||
|
errors = append(errors, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Capture full journal - needed so we can see e.g. disk mounts
|
||||||
|
// This does duplicate the other files, but ensures we have all output
|
||||||
|
if err := n.shellToFile(ctx, "sudo journalctl --output=short-precise", filepath.Join(n.dir, "journal.log")); err != nil {
|
||||||
|
errors = append(errors, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Capture logs from any systemd services in our list, that are registered
|
||||||
|
services, err := n.listSystemdUnits(ctx)
|
||||||
|
if err != nil {
|
||||||
|
errors = append(errors, fmt.Errorf("error listing systemd services: %v", err))
|
||||||
|
}
|
||||||
|
for _, s := range n.dumper.services {
|
||||||
|
name := s + ".service"
|
||||||
|
for _, service := range services {
|
||||||
|
if service == name {
|
||||||
|
if err := n.shellToFile(ctx, "sudo journalctl --output=cat -u "+name, filepath.Join(n.dir, s+".log")); err != nil {
|
||||||
|
errors = append(errors, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Capture any file logs where the files exist
|
||||||
|
fileList, err := n.findFiles(ctx, "/var/log")
|
||||||
|
if err != nil {
|
||||||
|
errors = append(errors, fmt.Errorf("error reading /var/log: %v", err))
|
||||||
|
}
|
||||||
|
for _, name := range n.dumper.files {
|
||||||
|
prefix := "/var/log/" + name + ".log"
|
||||||
|
for _, f := range fileList {
|
||||||
|
if !strings.HasPrefix(f, prefix) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err := n.shellToFile(ctx, "sudo cat "+f, filepath.Join(n.dir, filepath.Base(f))); err != nil {
|
||||||
|
errors = append(errors, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return errors
|
||||||
|
}
|
||||||
|
|
||||||
|
// findFiles lists files under the specified directory (recursively)
|
||||||
|
func (n *logDumperNode) findFiles(ctx context.Context, dir string) ([]string, error) {
|
||||||
|
var stdout bytes.Buffer
|
||||||
|
var stderr bytes.Buffer
|
||||||
|
|
||||||
|
err := n.client.ExecPiped(ctx, "sudo find "+dir+" -print0", &stdout, &stderr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error listing %q: %v", dir, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
paths := []string{}
|
||||||
|
for _, b := range bytes.Split(stdout.Bytes(), []byte{0}) {
|
||||||
|
if len(b) == 0 {
|
||||||
|
// Likely the last value
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
paths = append(paths, string(b))
|
||||||
|
}
|
||||||
|
return paths, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// listSystemdUnits returns the list of systemd units on the node
|
||||||
|
func (n *logDumperNode) listSystemdUnits(ctx context.Context) ([]string, error) {
|
||||||
|
var stdout bytes.Buffer
|
||||||
|
var stderr bytes.Buffer
|
||||||
|
|
||||||
|
err := n.client.ExecPiped(ctx, "sudo systemctl list-units -t service --no-pager --no-legend --all", &stdout, &stderr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error listing systemd units: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var services []string
|
||||||
|
for _, line := range strings.Split(stdout.String(), "\n") {
|
||||||
|
tokens := strings.Fields(line)
|
||||||
|
if len(tokens) == 0 || tokens[0] == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
services = append(services, tokens[0])
|
||||||
|
}
|
||||||
|
return services, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// shellToFile executes a command and copies the output to a file
|
||||||
|
func (n *logDumperNode) shellToFile(ctx context.Context, command string, destPath string) error {
|
||||||
|
if err := os.MkdirAll(filepath.Dir(destPath), 0755); err != nil {
|
||||||
|
log.Printf("unable to mkdir on %q: %v", filepath.Dir(destPath), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
f, err := os.Create(destPath)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error creating file %q: %v", destPath, err)
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
|
if err := n.client.ExecPiped(ctx, command, f, f); err != nil {
|
||||||
|
return fmt.Errorf("error executing command %q: %v", command, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// sshClientImplementation is the default implementation of sshClient, binding to a *ssh.Client
|
||||||
|
type sshClientImplementation struct {
|
||||||
|
client *ssh.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ sshClient = &sshClientImplementation{}
|
||||||
|
|
||||||
|
// ExecPiped implements sshClientImplementation::ExecPiped
|
||||||
|
func (s *sshClientImplementation) ExecPiped(ctx context.Context, cmd string, stdout io.Writer, stderr io.Writer) error {
|
||||||
|
if ctx.Err() != nil {
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
finished := make(chan error)
|
||||||
|
go func() {
|
||||||
|
session, err := s.client.NewSession()
|
||||||
|
if err != nil {
|
||||||
|
finished <- fmt.Errorf("error creating ssh session: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer session.Close()
|
||||||
|
|
||||||
|
klog.V(2).Infof("running SSH command: %v", cmd)
|
||||||
|
|
||||||
|
session.Stdout = stdout
|
||||||
|
session.Stderr = stderr
|
||||||
|
|
||||||
|
finished <- session.Run(cmd)
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
log.Print("closing SSH tcp connection due to context completion")
|
||||||
|
|
||||||
|
// terminate the TCP connection to force a disconnect - we assume everyone is using the same context.
|
||||||
|
// We could make this better by sending a signal on the session, waiting and then closing the session,
|
||||||
|
// and only if we still haven't succeeded then closing the TCP connection. This is sufficient for our
|
||||||
|
// current usage though - and hopefully that logic will be implemented in the SSH package itself.
|
||||||
|
s.Close()
|
||||||
|
|
||||||
|
<-finished // Wait for cancellation
|
||||||
|
return ctx.Err()
|
||||||
|
|
||||||
|
case err := <-finished:
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close implements sshClientImplementation::Close
|
||||||
|
func (s *sshClientImplementation) Close() error {
|
||||||
|
return s.client.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
// sshClientFactoryImplementation is the default implementation of sshClientFactory
|
||||||
|
type sshClientFactoryImplementation struct {
|
||||||
|
sshConfig *ssh.ClientConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ sshClientFactory = &sshClientFactoryImplementation{}
|
||||||
|
|
||||||
|
// Dial implements sshClientFactory::Dial
|
||||||
|
func (f *sshClientFactoryImplementation) Dial(ctx context.Context, host string) (sshClient, error) {
|
||||||
|
addr := host + ":22"
|
||||||
|
d := net.Dialer{}
|
||||||
|
conn, err := d.DialContext(ctx, "tcp", addr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// We have a TCP connection; we will force-close it to support context cancellation
|
||||||
|
|
||||||
|
var client *ssh.Client
|
||||||
|
finished := make(chan error)
|
||||||
|
go func() {
|
||||||
|
c, chans, reqs, err := ssh.NewClientConn(conn, addr, f.sshConfig)
|
||||||
|
if err == nil {
|
||||||
|
client = ssh.NewClient(c, chans, reqs)
|
||||||
|
}
|
||||||
|
finished <- err
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
log.Print("cancelling SSH tcp connection due to context completion")
|
||||||
|
conn.Close() // Close the TCP connection to force cancellation
|
||||||
|
<-finished // Wait for cancellation
|
||||||
|
return nil, ctx.Err()
|
||||||
|
case err := <-finished:
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &sshClientImplementation{
|
||||||
|
client: client,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue