From 969a47ceedcf8bdaffded6323745ce3c4dfc425f Mon Sep 17 00:00:00 2001 From: Sean Sullivan Date: Thu, 6 Jul 2023 21:22:07 -0700 Subject: [PATCH] StreamTranslator and FallbackExecutor for WebSockets Kubernetes-commit: 168998e87bfd49a1b0bc6402761fafd5ace3bb3b --- pkg/cmd/attach/attach.go | 22 ++++++++++++++++++---- pkg/cmd/attach/attach_test.go | 15 +++++---------- pkg/cmd/exec/exec.go | 22 ++++++++++++++++++---- pkg/cmd/exec/exec_test.go | 7 +------ pkg/cmd/util/helpers.go | 6 ++++-- 5 files changed, 46 insertions(+), 26 deletions(-) diff --git a/pkg/cmd/attach/attach.go b/pkg/cmd/attach/attach.go index 263e006c4..af25e0729 100644 --- a/pkg/cmd/attach/attach.go +++ b/pkg/cmd/attach/attach.go @@ -28,6 +28,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/httpstream" "k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/cli-runtime/pkg/genericiooptions" "k8s.io/cli-runtime/pkg/resource" @@ -125,7 +126,7 @@ func NewCmdAttach(f cmdutil.Factory, streams genericiooptions.IOStreams) *cobra. // RemoteAttach defines the interface accepted by the Attach command - provided for test stubbing type RemoteAttach interface { - Attach(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error + Attach(url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error } // DefaultAttachFunc is the default AttachFunc used @@ -148,7 +149,7 @@ func DefaultAttachFunc(o *AttachOptions, containerToAttach *corev1.Container, ra TTY: raw, }, scheme.ParameterCodec) - return o.Attach.Attach("POST", req.URL(), o.Config, o.In, o.Out, o.ErrOut, raw, sizeQueue) + return o.Attach.Attach(req.URL(), o.Config, o.In, o.Out, o.ErrOut, raw, sizeQueue) } } @@ -156,11 +157,24 @@ func DefaultAttachFunc(o *AttachOptions, containerToAttach *corev1.Container, ra type DefaultRemoteAttach struct{} // Attach executes attach to a running container -func (*DefaultRemoteAttach) Attach(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error { - exec, err := remotecommand.NewSPDYExecutor(config, method, url) +func (*DefaultRemoteAttach) Attach(url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error { + // Legacy SPDY executor is default. If feature gate enabled, fallback + // executor attempts websockets first--then SPDY. + exec, err := remotecommand.NewSPDYExecutor(config, "POST", url) if err != nil { return err } + if cmdutil.RemoteCommandWebsockets.IsEnabled() { + // WebSocketExecutor must be "GET" method as described in RFC 6455 Sec. 4.1 (page 17). + websocketExec, err := remotecommand.NewWebSocketExecutor(config, "GET", url.String()) + if err != nil { + return err + } + exec, err = remotecommand.NewFallbackExecutor(websocketExec, exec, httpstream.IsUpgradeFailure) + if err != nil { + return err + } + } return exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{ Stdin: stdin, Stdout: stdout, diff --git a/pkg/cmd/attach/attach_test.go b/pkg/cmd/attach/attach_test.go index 24b6e71d2..6d491323e 100644 --- a/pkg/cmd/attach/attach_test.go +++ b/pkg/cmd/attach/attach_test.go @@ -43,13 +43,11 @@ import ( ) type fakeRemoteAttach struct { - method string - url *url.URL - err error + url *url.URL + err error } -func (f *fakeRemoteAttach) Attach(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error { - f.method = method +func (f *fakeRemoteAttach) Attach(url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error { f.url = url return f.err } @@ -327,7 +325,7 @@ func TestAttach(t *testing.T) { return err } - return options.Attach.Attach("POST", u, nil, nil, nil, nil, raw, sizeQueue) + return options.Attach.Attach(u, nil, nil, nil, nil, raw, sizeQueue) } } @@ -347,9 +345,6 @@ func TestAttach(t *testing.T) { t.Errorf("%s: Did not get expected path for exec request: %q %q", test.name, test.attachPath, remoteAttach.url.Path) return } - if remoteAttach.method != "POST" { - t.Errorf("%s: Did not get method for attach request: %s", test.name, remoteAttach.method) - } if remoteAttach.url.Query().Get("container") != "bar" { t.Errorf("%s: Did not have query parameters: %s", test.name, remoteAttach.url.Query()) } @@ -428,7 +423,7 @@ func TestAttachWarnings(t *testing.T) { return err } - return options.Attach.Attach("POST", u, nil, nil, nil, nil, raw, sizeQueue) + return options.Attach.Attach(u, nil, nil, nil, nil, raw, sizeQueue) } } diff --git a/pkg/cmd/exec/exec.go b/pkg/cmd/exec/exec.go index 2a29aecf8..36d43bece 100644 --- a/pkg/cmd/exec/exec.go +++ b/pkg/cmd/exec/exec.go @@ -27,6 +27,7 @@ import ( "github.com/spf13/cobra" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/httpstream" "k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/cli-runtime/pkg/genericiooptions" "k8s.io/cli-runtime/pkg/resource" @@ -113,17 +114,30 @@ func NewCmdExec(f cmdutil.Factory, streams genericiooptions.IOStreams) *cobra.Co // RemoteExecutor defines the interface accepted by the Exec command - provided for test stubbing type RemoteExecutor interface { - Execute(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error + Execute(url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error } // DefaultRemoteExecutor is the standard implementation of remote command execution type DefaultRemoteExecutor struct{} -func (*DefaultRemoteExecutor) Execute(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error { - exec, err := remotecommand.NewSPDYExecutor(config, method, url) +func (*DefaultRemoteExecutor) Execute(url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error { + // Legacy SPDY executor is default. If feature gate enabled, fallback + // executor attempts websockets first--then SPDY. + exec, err := remotecommand.NewSPDYExecutor(config, "POST", url) if err != nil { return err } + if cmdutil.RemoteCommandWebsockets.IsEnabled() { + // WebSocketExecutor must be "GET" method as described in RFC 6455 Sec. 4.1 (page 17). + websocketExec, err := remotecommand.NewWebSocketExecutor(config, "GET", url.String()) + if err != nil { + return err + } + exec, err = remotecommand.NewFallbackExecutor(websocketExec, exec, httpstream.IsUpgradeFailure) + if err != nil { + return err + } + } return exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{ Stdin: stdin, Stdout: stdout, @@ -371,7 +385,7 @@ func (p *ExecOptions) Run() error { TTY: t.Raw, }, scheme.ParameterCodec) - return p.Executor.Execute("POST", req.URL(), p.Config, p.In, p.Out, p.ErrOut, t.Raw, sizeQueue) + return p.Executor.Execute(req.URL(), p.Config, p.In, p.Out, p.ErrOut, t.Raw, sizeQueue) } if err := t.Safe(fn); err != nil { diff --git a/pkg/cmd/exec/exec_test.go b/pkg/cmd/exec/exec_test.go index 82ffe85e7..7305231f1 100644 --- a/pkg/cmd/exec/exec_test.go +++ b/pkg/cmd/exec/exec_test.go @@ -40,13 +40,11 @@ import ( ) type fakeRemoteExecutor struct { - method string url *url.URL execErr error } -func (f *fakeRemoteExecutor) Execute(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error { - f.method = method +func (f *fakeRemoteExecutor) Execute(url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error { f.url = url return f.execErr } @@ -264,9 +262,6 @@ func TestExec(t *testing.T) { t.Errorf("%s: Did not get expected container query param for exec request", test.name) return } - if ex.method != "POST" { - t.Errorf("%s: Did not get method for exec request: %s", test.name, ex.method) - } }) } } diff --git a/pkg/cmd/util/helpers.go b/pkg/cmd/util/helpers.go index bbb6e701b..03f3e7f0c 100644 --- a/pkg/cmd/util/helpers.go +++ b/pkg/cmd/util/helpers.go @@ -425,8 +425,10 @@ func GetPodRunningTimeoutFlag(cmd *cobra.Command) (time.Duration, error) { type FeatureGate string const ( - ApplySet FeatureGate = "KUBECTL_APPLYSET" - CmdPluginAsSubcommand FeatureGate = "KUBECTL_ENABLE_CMD_SHADOW" + ApplySet FeatureGate = "KUBECTL_APPLYSET" + CmdPluginAsSubcommand FeatureGate = "KUBECTL_ENABLE_CMD_SHADOW" + InteractiveDelete FeatureGate = "KUBECTL_INTERACTIVE_DELETE" + RemoteCommandWebsockets FeatureGate = "KUBECTL_REMOTE_COMMAND_WEBSOCKETS" ) // IsEnabled returns true iff environment variable is set to true.