Merge pull request #119186 from seans3/stream-translator-proxy

Stream Translator Proxy and FallbackExecutor for WebSockets

Kubernetes-commit: 87981480f33790225628824943217bd6bb7564bb
This commit is contained in:
Kubernetes Publisher 2023-10-24 17:10:34 +02:00
commit d574b30d56
7 changed files with 54 additions and 34 deletions

8
go.mod
View File

@ -31,9 +31,9 @@ require (
golang.org/x/sys v0.13.0
gopkg.in/yaml.v2 v2.4.0
k8s.io/api v0.0.0-20231023194506-bfce70f1b5c8
k8s.io/apimachinery v0.0.0-20231024034334-1e138bd489ac
k8s.io/apimachinery v0.0.0-20231024171030-c18d2bfed439
k8s.io/cli-runtime v0.0.0-20231024042735-7a5787c1f8c2
k8s.io/client-go v0.0.0-20231024035150-c92537416a96
k8s.io/client-go v0.0.0-20231024171543-e2e59f3539ef
k8s.io/component-base v0.0.0-20231024040035-12d4256eb135
k8s.io/component-helpers v0.0.0-20231020235215-606f6e27cff2
k8s.io/klog/v2 v2.100.1
@ -97,9 +97,9 @@ require (
replace (
k8s.io/api => k8s.io/api v0.0.0-20231023194506-bfce70f1b5c8
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20231024034334-1e138bd489ac
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20231024171030-c18d2bfed439
k8s.io/cli-runtime => k8s.io/cli-runtime v0.0.0-20231024042735-7a5787c1f8c2
k8s.io/client-go => k8s.io/client-go v0.0.0-20231024035150-c92537416a96
k8s.io/client-go => k8s.io/client-go v0.0.0-20231024171543-e2e59f3539ef
k8s.io/code-generator => k8s.io/code-generator v0.0.0-20231024034032-0f8102fa0710
k8s.io/component-base => k8s.io/component-base v0.0.0-20231024040035-12d4256eb135
k8s.io/component-helpers => k8s.io/component-helpers v0.0.0-20231020235215-606f6e27cff2

8
go.sum
View File

@ -278,12 +278,12 @@ honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
k8s.io/api v0.0.0-20231023194506-bfce70f1b5c8 h1:U7xcM/WBTkLV+TjNciuW7l+oXM2OHd5/TmVnPKyrmpA=
k8s.io/api v0.0.0-20231023194506-bfce70f1b5c8/go.mod h1:mgYOiLIgrQcsuVxrBI6Pplk91r3sl5ZJ7eUx7UBMTkY=
k8s.io/apimachinery v0.0.0-20231024034334-1e138bd489ac h1:x3g6c1u7CtRoraBlRP2JThB3aHz7vw4FZFXRZsvoIoc=
k8s.io/apimachinery v0.0.0-20231024034334-1e138bd489ac/go.mod h1:mdlGhJWO1mhVzQXm1Lx7D1BvvBIVKlRVy0vvl1LwGjg=
k8s.io/apimachinery v0.0.0-20231024171030-c18d2bfed439 h1:/oxbLzC7mkHNdeFI8AMsTPTwudQu7sz7rnPGIxv2yqM=
k8s.io/apimachinery v0.0.0-20231024171030-c18d2bfed439/go.mod h1:mdlGhJWO1mhVzQXm1Lx7D1BvvBIVKlRVy0vvl1LwGjg=
k8s.io/cli-runtime v0.0.0-20231024042735-7a5787c1f8c2 h1:joPkHUx9n26GJj3rmQn/vebDK58bjWFCjQhw3jbTAz4=
k8s.io/cli-runtime v0.0.0-20231024042735-7a5787c1f8c2/go.mod h1:unBeO/pUYLN/9vAOaeXW8RLHPO99g8NPadjeFvvSI5U=
k8s.io/client-go v0.0.0-20231024035150-c92537416a96 h1:76J+c8hyhX3e9eteWOc08cGsJeH5ky0Jmh/naC0ll8g=
k8s.io/client-go v0.0.0-20231024035150-c92537416a96/go.mod h1:hML9Z37ARvWfQt+YEVEMZ3EVJBqM19lCsFXogGW6VX8=
k8s.io/client-go v0.0.0-20231024171543-e2e59f3539ef h1:dx12CsKyk2cct0NtF7fHMX8cOzb1uJhI5wn2sQDpr60=
k8s.io/client-go v0.0.0-20231024171543-e2e59f3539ef/go.mod h1:3HC2qEcjQxIt5UW1R7vC5RX2sf/wkRWovfAEPkbmPxA=
k8s.io/component-base v0.0.0-20231024040035-12d4256eb135 h1:BxZJ2rg42EI0RbeNV5gb+8tdwYZ1iwxZJW4FmUMMdtc=
k8s.io/component-base v0.0.0-20231024040035-12d4256eb135/go.mod h1:ft9o5mWD7glAMtEqdxl4CmAKA9G6DFYRajW3TPrsQhs=
k8s.io/component-helpers v0.0.0-20231020235215-606f6e27cff2 h1:UU4HaZWw14RLLGdOqFR0aWe0zIHS+b8KOGXTSkii4ng=

View File

@ -28,6 +28,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/cli-runtime/pkg/genericiooptions"
"k8s.io/cli-runtime/pkg/resource"
@ -125,7 +126,7 @@ func NewCmdAttach(f cmdutil.Factory, streams genericiooptions.IOStreams) *cobra.
// RemoteAttach defines the interface accepted by the Attach command - provided for test stubbing
type RemoteAttach interface {
Attach(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error
Attach(url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error
}
// DefaultAttachFunc is the default AttachFunc used
@ -148,7 +149,7 @@ func DefaultAttachFunc(o *AttachOptions, containerToAttach *corev1.Container, ra
TTY: raw,
}, scheme.ParameterCodec)
return o.Attach.Attach("POST", req.URL(), o.Config, o.In, o.Out, o.ErrOut, raw, sizeQueue)
return o.Attach.Attach(req.URL(), o.Config, o.In, o.Out, o.ErrOut, raw, sizeQueue)
}
}
@ -156,11 +157,24 @@ func DefaultAttachFunc(o *AttachOptions, containerToAttach *corev1.Container, ra
type DefaultRemoteAttach struct{}
// Attach executes attach to a running container
func (*DefaultRemoteAttach) Attach(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error {
exec, err := remotecommand.NewSPDYExecutor(config, method, url)
func (*DefaultRemoteAttach) Attach(url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error {
// Legacy SPDY executor is default. If feature gate enabled, fallback
// executor attempts websockets first--then SPDY.
exec, err := remotecommand.NewSPDYExecutor(config, "POST", url)
if err != nil {
return err
}
if cmdutil.RemoteCommandWebsockets.IsEnabled() {
// WebSocketExecutor must be "GET" method as described in RFC 6455 Sec. 4.1 (page 17).
websocketExec, err := remotecommand.NewWebSocketExecutor(config, "GET", url.String())
if err != nil {
return err
}
exec, err = remotecommand.NewFallbackExecutor(websocketExec, exec, httpstream.IsUpgradeFailure)
if err != nil {
return err
}
}
return exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{
Stdin: stdin,
Stdout: stdout,

View File

@ -43,13 +43,11 @@ import (
)
type fakeRemoteAttach struct {
method string
url *url.URL
err error
url *url.URL
err error
}
func (f *fakeRemoteAttach) Attach(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error {
f.method = method
func (f *fakeRemoteAttach) Attach(url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error {
f.url = url
return f.err
}
@ -327,7 +325,7 @@ func TestAttach(t *testing.T) {
return err
}
return options.Attach.Attach("POST", u, nil, nil, nil, nil, raw, sizeQueue)
return options.Attach.Attach(u, nil, nil, nil, nil, raw, sizeQueue)
}
}
@ -347,9 +345,6 @@ func TestAttach(t *testing.T) {
t.Errorf("%s: Did not get expected path for exec request: %q %q", test.name, test.attachPath, remoteAttach.url.Path)
return
}
if remoteAttach.method != "POST" {
t.Errorf("%s: Did not get method for attach request: %s", test.name, remoteAttach.method)
}
if remoteAttach.url.Query().Get("container") != "bar" {
t.Errorf("%s: Did not have query parameters: %s", test.name, remoteAttach.url.Query())
}
@ -428,7 +423,7 @@ func TestAttachWarnings(t *testing.T) {
return err
}
return options.Attach.Attach("POST", u, nil, nil, nil, nil, raw, sizeQueue)
return options.Attach.Attach(u, nil, nil, nil, nil, raw, sizeQueue)
}
}

View File

@ -27,6 +27,7 @@ import (
"github.com/spf13/cobra"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/cli-runtime/pkg/genericiooptions"
"k8s.io/cli-runtime/pkg/resource"
@ -113,17 +114,30 @@ func NewCmdExec(f cmdutil.Factory, streams genericiooptions.IOStreams) *cobra.Co
// RemoteExecutor defines the interface accepted by the Exec command - provided for test stubbing
type RemoteExecutor interface {
Execute(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error
Execute(url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error
}
// DefaultRemoteExecutor is the standard implementation of remote command execution
type DefaultRemoteExecutor struct{}
func (*DefaultRemoteExecutor) Execute(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error {
exec, err := remotecommand.NewSPDYExecutor(config, method, url)
func (*DefaultRemoteExecutor) Execute(url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error {
// Legacy SPDY executor is default. If feature gate enabled, fallback
// executor attempts websockets first--then SPDY.
exec, err := remotecommand.NewSPDYExecutor(config, "POST", url)
if err != nil {
return err
}
if cmdutil.RemoteCommandWebsockets.IsEnabled() {
// WebSocketExecutor must be "GET" method as described in RFC 6455 Sec. 4.1 (page 17).
websocketExec, err := remotecommand.NewWebSocketExecutor(config, "GET", url.String())
if err != nil {
return err
}
exec, err = remotecommand.NewFallbackExecutor(websocketExec, exec, httpstream.IsUpgradeFailure)
if err != nil {
return err
}
}
return exec.StreamWithContext(context.Background(), remotecommand.StreamOptions{
Stdin: stdin,
Stdout: stdout,
@ -371,7 +385,7 @@ func (p *ExecOptions) Run() error {
TTY: t.Raw,
}, scheme.ParameterCodec)
return p.Executor.Execute("POST", req.URL(), p.Config, p.In, p.Out, p.ErrOut, t.Raw, sizeQueue)
return p.Executor.Execute(req.URL(), p.Config, p.In, p.Out, p.ErrOut, t.Raw, sizeQueue)
}
if err := t.Safe(fn); err != nil {

View File

@ -40,13 +40,11 @@ import (
)
type fakeRemoteExecutor struct {
method string
url *url.URL
execErr error
}
func (f *fakeRemoteExecutor) Execute(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error {
f.method = method
func (f *fakeRemoteExecutor) Execute(url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error {
f.url = url
return f.execErr
}
@ -264,9 +262,6 @@ func TestExec(t *testing.T) {
t.Errorf("%s: Did not get expected container query param for exec request", test.name)
return
}
if ex.method != "POST" {
t.Errorf("%s: Did not get method for exec request: %s", test.name, ex.method)
}
})
}
}

View File

@ -425,8 +425,10 @@ func GetPodRunningTimeoutFlag(cmd *cobra.Command) (time.Duration, error) {
type FeatureGate string
const (
ApplySet FeatureGate = "KUBECTL_APPLYSET"
CmdPluginAsSubcommand FeatureGate = "KUBECTL_ENABLE_CMD_SHADOW"
ApplySet FeatureGate = "KUBECTL_APPLYSET"
CmdPluginAsSubcommand FeatureGate = "KUBECTL_ENABLE_CMD_SHADOW"
InteractiveDelete FeatureGate = "KUBECTL_INTERACTIVE_DELETE"
RemoteCommandWebsockets FeatureGate = "KUBECTL_REMOTE_COMMAND_WEBSOCKETS"
)
// IsEnabled returns true iff environment variable is set to true.