StreamTranslator and FallbackExecutor for WebSockets
Kubernetes-commit: 168998e87bfd49a1b0bc6402761fafd5ace3bb3b
This commit is contained in:
parent
22cc602cb3
commit
969a47ceed
|
|
@ -28,6 +28,7 @@ import (
|
||||||
|
|
||||||
corev1 "k8s.io/api/core/v1"
|
corev1 "k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/util/httpstream"
|
||||||
"k8s.io/cli-runtime/pkg/genericclioptions"
|
"k8s.io/cli-runtime/pkg/genericclioptions"
|
||||||
"k8s.io/cli-runtime/pkg/genericiooptions"
|
"k8s.io/cli-runtime/pkg/genericiooptions"
|
||||||
"k8s.io/cli-runtime/pkg/resource"
|
"k8s.io/cli-runtime/pkg/resource"
|
||||||
|
|
@ -125,7 +126,7 @@ func NewCmdAttach(f cmdutil.Factory, streams genericiooptions.IOStreams) *cobra.
|
||||||
|
|
||||||
// RemoteAttach defines the interface accepted by the Attach command - provided for test stubbing
|
// RemoteAttach defines the interface accepted by the Attach command - provided for test stubbing
|
||||||
type RemoteAttach interface {
|
type RemoteAttach interface {
|
||||||
Attach(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error
|
Attach(url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// DefaultAttachFunc is the default AttachFunc used
|
// DefaultAttachFunc is the default AttachFunc used
|
||||||
|
|
@ -148,7 +149,7 @@ func DefaultAttachFunc(o *AttachOptions, containerToAttach *corev1.Container, ra
|
||||||
TTY: raw,
|
TTY: raw,
|
||||||
}, scheme.ParameterCodec)
|
}, scheme.ParameterCodec)
|
||||||
|
|
||||||
return o.Attach.Attach("POST", req.URL(), o.Config, o.In, o.Out, o.ErrOut, raw, sizeQueue)
|
return o.Attach.Attach(req.URL(), o.Config, o.In, o.Out, o.ErrOut, raw, sizeQueue)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -156,11 +157,24 @@ func DefaultAttachFunc(o *AttachOptions, containerToAttach *corev1.Container, ra
|
||||||
type DefaultRemoteAttach struct{}
|
type DefaultRemoteAttach struct{}
|
||||||
|
|
||||||
// Attach executes attach to a running container
|
// Attach executes attach to a running container
|
||||||
func (*DefaultRemoteAttach) Attach(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error {
|
func (*DefaultRemoteAttach) Attach(url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error {
|
||||||
exec, err := remotecommand.NewSPDYExecutor(config, method, url)
|
// Legacy SPDY executor is default. If feature gate enabled, fallback
|
||||||
|
// executor attempts websockets first--then SPDY.
|
||||||
|
exec, err := remotecommand.NewSPDYExecutor(config, "POST", url)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if cmdutil.RemoteCommandWebsockets.IsEnabled() {
|
||||||
|
// WebSocketExecutor must be "GET" method as described in RFC 6455 Sec. 4.1 (page 17).
|
||||||
|
websocketExec, err := remotecommand.NewWebSocketExecutor(config, "GET", url.String())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
exec, err = remotecommand.NewFallbackExecutor(websocketExec, exec, httpstream.IsUpgradeFailure)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
return exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{
|
return exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{
|
||||||
Stdin: stdin,
|
Stdin: stdin,
|
||||||
Stdout: stdout,
|
Stdout: stdout,
|
||||||
|
|
|
||||||
|
|
@ -43,13 +43,11 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type fakeRemoteAttach struct {
|
type fakeRemoteAttach struct {
|
||||||
method string
|
url *url.URL
|
||||||
url *url.URL
|
err error
|
||||||
err error
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *fakeRemoteAttach) Attach(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error {
|
func (f *fakeRemoteAttach) Attach(url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error {
|
||||||
f.method = method
|
|
||||||
f.url = url
|
f.url = url
|
||||||
return f.err
|
return f.err
|
||||||
}
|
}
|
||||||
|
|
@ -327,7 +325,7 @@ func TestAttach(t *testing.T) {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return options.Attach.Attach("POST", u, nil, nil, nil, nil, raw, sizeQueue)
|
return options.Attach.Attach(u, nil, nil, nil, nil, raw, sizeQueue)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -347,9 +345,6 @@ func TestAttach(t *testing.T) {
|
||||||
t.Errorf("%s: Did not get expected path for exec request: %q %q", test.name, test.attachPath, remoteAttach.url.Path)
|
t.Errorf("%s: Did not get expected path for exec request: %q %q", test.name, test.attachPath, remoteAttach.url.Path)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if remoteAttach.method != "POST" {
|
|
||||||
t.Errorf("%s: Did not get method for attach request: %s", test.name, remoteAttach.method)
|
|
||||||
}
|
|
||||||
if remoteAttach.url.Query().Get("container") != "bar" {
|
if remoteAttach.url.Query().Get("container") != "bar" {
|
||||||
t.Errorf("%s: Did not have query parameters: %s", test.name, remoteAttach.url.Query())
|
t.Errorf("%s: Did not have query parameters: %s", test.name, remoteAttach.url.Query())
|
||||||
}
|
}
|
||||||
|
|
@ -428,7 +423,7 @@ func TestAttachWarnings(t *testing.T) {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return options.Attach.Attach("POST", u, nil, nil, nil, nil, raw, sizeQueue)
|
return options.Attach.Attach(u, nil, nil, nil, nil, raw, sizeQueue)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -27,6 +27,7 @@ import (
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
corev1 "k8s.io/api/core/v1"
|
corev1 "k8s.io/api/core/v1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/util/httpstream"
|
||||||
"k8s.io/cli-runtime/pkg/genericclioptions"
|
"k8s.io/cli-runtime/pkg/genericclioptions"
|
||||||
"k8s.io/cli-runtime/pkg/genericiooptions"
|
"k8s.io/cli-runtime/pkg/genericiooptions"
|
||||||
"k8s.io/cli-runtime/pkg/resource"
|
"k8s.io/cli-runtime/pkg/resource"
|
||||||
|
|
@ -113,17 +114,30 @@ func NewCmdExec(f cmdutil.Factory, streams genericiooptions.IOStreams) *cobra.Co
|
||||||
|
|
||||||
// RemoteExecutor defines the interface accepted by the Exec command - provided for test stubbing
|
// RemoteExecutor defines the interface accepted by the Exec command - provided for test stubbing
|
||||||
type RemoteExecutor interface {
|
type RemoteExecutor interface {
|
||||||
Execute(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error
|
Execute(url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// DefaultRemoteExecutor is the standard implementation of remote command execution
|
// DefaultRemoteExecutor is the standard implementation of remote command execution
|
||||||
type DefaultRemoteExecutor struct{}
|
type DefaultRemoteExecutor struct{}
|
||||||
|
|
||||||
func (*DefaultRemoteExecutor) Execute(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error {
|
func (*DefaultRemoteExecutor) Execute(url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error {
|
||||||
exec, err := remotecommand.NewSPDYExecutor(config, method, url)
|
// Legacy SPDY executor is default. If feature gate enabled, fallback
|
||||||
|
// executor attempts websockets first--then SPDY.
|
||||||
|
exec, err := remotecommand.NewSPDYExecutor(config, "POST", url)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if cmdutil.RemoteCommandWebsockets.IsEnabled() {
|
||||||
|
// WebSocketExecutor must be "GET" method as described in RFC 6455 Sec. 4.1 (page 17).
|
||||||
|
websocketExec, err := remotecommand.NewWebSocketExecutor(config, "GET", url.String())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
exec, err = remotecommand.NewFallbackExecutor(websocketExec, exec, httpstream.IsUpgradeFailure)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
return exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{
|
return exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{
|
||||||
Stdin: stdin,
|
Stdin: stdin,
|
||||||
Stdout: stdout,
|
Stdout: stdout,
|
||||||
|
|
@ -371,7 +385,7 @@ func (p *ExecOptions) Run() error {
|
||||||
TTY: t.Raw,
|
TTY: t.Raw,
|
||||||
}, scheme.ParameterCodec)
|
}, scheme.ParameterCodec)
|
||||||
|
|
||||||
return p.Executor.Execute("POST", req.URL(), p.Config, p.In, p.Out, p.ErrOut, t.Raw, sizeQueue)
|
return p.Executor.Execute(req.URL(), p.Config, p.In, p.Out, p.ErrOut, t.Raw, sizeQueue)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := t.Safe(fn); err != nil {
|
if err := t.Safe(fn); err != nil {
|
||||||
|
|
|
||||||
|
|
@ -40,13 +40,11 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type fakeRemoteExecutor struct {
|
type fakeRemoteExecutor struct {
|
||||||
method string
|
|
||||||
url *url.URL
|
url *url.URL
|
||||||
execErr error
|
execErr error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *fakeRemoteExecutor) Execute(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error {
|
func (f *fakeRemoteExecutor) Execute(url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error {
|
||||||
f.method = method
|
|
||||||
f.url = url
|
f.url = url
|
||||||
return f.execErr
|
return f.execErr
|
||||||
}
|
}
|
||||||
|
|
@ -264,9 +262,6 @@ func TestExec(t *testing.T) {
|
||||||
t.Errorf("%s: Did not get expected container query param for exec request", test.name)
|
t.Errorf("%s: Did not get expected container query param for exec request", test.name)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if ex.method != "POST" {
|
|
||||||
t.Errorf("%s: Did not get method for exec request: %s", test.name, ex.method)
|
|
||||||
}
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -425,8 +425,10 @@ func GetPodRunningTimeoutFlag(cmd *cobra.Command) (time.Duration, error) {
|
||||||
type FeatureGate string
|
type FeatureGate string
|
||||||
|
|
||||||
const (
|
const (
|
||||||
ApplySet FeatureGate = "KUBECTL_APPLYSET"
|
ApplySet FeatureGate = "KUBECTL_APPLYSET"
|
||||||
CmdPluginAsSubcommand FeatureGate = "KUBECTL_ENABLE_CMD_SHADOW"
|
CmdPluginAsSubcommand FeatureGate = "KUBECTL_ENABLE_CMD_SHADOW"
|
||||||
|
InteractiveDelete FeatureGate = "KUBECTL_INTERACTIVE_DELETE"
|
||||||
|
RemoteCommandWebsockets FeatureGate = "KUBECTL_REMOTE_COMMAND_WEBSOCKETS"
|
||||||
)
|
)
|
||||||
|
|
||||||
// IsEnabled returns true iff environment variable is set to true.
|
// IsEnabled returns true iff environment variable is set to true.
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue