From 858d3a5b72f95ec30c18193ecf2f11f0596bf535 Mon Sep 17 00:00:00 2001 From: Sean Sullivan Date: Wed, 21 Feb 2024 08:56:07 +0000 Subject: [PATCH] portforward: tunnel spdy through websockets Kubernetes-commit: 8b447d8c97e8823b4308eb91cf7d75693e867c61 --- pkg/cmd/portforward/portforward.go | 39 ++++++++++++++++++++----- pkg/cmd/portforward/portforward_test.go | 5 +++- pkg/cmd/util/helpers.go | 1 + 3 files changed, 36 insertions(+), 9 deletions(-) diff --git a/pkg/cmd/portforward/portforward.go b/pkg/cmd/portforward/portforward.go index 31e1eef7..f0200354 100644 --- a/pkg/cmd/portforward/portforward.go +++ b/pkg/cmd/portforward/portforward.go @@ -31,6 +31,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/httpstream" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/cli-runtime/pkg/genericiooptions" "k8s.io/client-go/kubernetes/scheme" @@ -50,7 +51,7 @@ import ( type PortForwardOptions struct { Namespace string PodName string - RESTClient *restclient.RESTClient + RESTClient restclient.Interface Config *restclient.Config PodClient corev1client.PodsGetter Address []string @@ -99,11 +100,7 @@ const ( ) func NewCmdPortForward(f cmdutil.Factory, streams genericiooptions.IOStreams) *cobra.Command { - opts := &PortForwardOptions{ - PortForwarder: &defaultPortForwarder{ - IOStreams: streams, - }, - } + opts := NewDefaultPortForwardOptions(streams) cmd := &cobra.Command{ Use: "port-forward TYPE/NAME [options] [LOCAL_PORT:]REMOTE_PORT [...[LOCAL_PORT_N:]REMOTE_PORT_N]", DisableFlagsInUseLine: true, @@ -123,6 +120,14 @@ func NewCmdPortForward(f cmdutil.Factory, streams genericiooptions.IOStreams) *c return cmd } +func NewDefaultPortForwardOptions(streams genericiooptions.IOStreams) *PortForwardOptions { + return &PortForwardOptions{ + PortForwarder: &defaultPortForwarder{ + IOStreams: streams, + }, + } +} + type portForwarder interface { ForwardPorts(method string, url *url.URL, opts PortForwardOptions) error } @@ -137,6 +142,14 @@ func (f *defaultPortForwarder) ForwardPorts(method string, url *url.URL, opts Po return err } dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, method, url) + if cmdutil.PortForwardWebsockets.IsEnabled() { + tunnelingDialer, err := portforward.NewSPDYOverWebsocketDialer(url, opts.Config) + if err != nil { + return err + } + // First attempt tunneling (websocket) dialer, then fallback to spdy dialer. + dialer = portforward.NewFallbackDialer(tunnelingDialer, dialer, httpstream.IsUpgradeFailure) + } fw, err := portforward.NewOnAddresses(dialer, opts.Address, opts.Ports, opts.StopChannel, opts.ReadyChannel, f.Out, f.ErrOut) if err != nil { return err @@ -387,7 +400,14 @@ func (o PortForwardOptions) Validate() error { // RunPortForward implements all the necessary functionality for port-forward cmd. func (o PortForwardOptions) RunPortForward() error { - pod, err := o.PodClient.Pods(o.Namespace).Get(context.TODO(), o.PodName, metav1.GetOptions{}) + return o.RunPortForwardContext(context.Background()) +} + +// RunPortForwardContext implements all the necessary functionality for port-forward cmd. +// It ends portforwarding when an error is received from the backend, or an os.Interrupt +// signal is received, or the provided context is done. +func (o PortForwardOptions) RunPortForwardContext(ctx context.Context) error { + pod, err := o.PodClient.Pods(o.Namespace).Get(ctx, o.PodName, metav1.GetOptions{}) if err != nil { return err } @@ -401,7 +421,10 @@ func (o PortForwardOptions) RunPortForward() error { defer signal.Stop(signals) go func() { - <-signals + select { + case <-signals: + case <-ctx.Done(): + } if o.StopChannel != nil { close(o.StopChannel) } diff --git a/pkg/cmd/portforward/portforward_test.go b/pkg/cmd/portforward/portforward_test.go index fb2252c5..2aee31e7 100644 --- a/pkg/cmd/portforward/portforward_test.go +++ b/pkg/cmd/portforward/portforward_test.go @@ -17,6 +17,7 @@ limitations under the License. package portforward import ( + "context" "fmt" "net/http" "net/url" @@ -101,6 +102,8 @@ func testPortForward(t *testing.T, flags map[string]string, args []string) { } opts := &PortForwardOptions{} + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() cmd := NewCmdPortForward(tf, genericiooptions.NewTestIOStreamsDiscard()) cmd.Run = func(cmd *cobra.Command, args []string) { if err = opts.Complete(tf, cmd, args); err != nil { @@ -110,7 +113,7 @@ func testPortForward(t *testing.T, flags map[string]string, args []string) { if err = opts.Validate(); err != nil { return } - err = opts.RunPortForward() + err = opts.RunPortForwardContext(ctx) } for name, value := range flags { diff --git a/pkg/cmd/util/helpers.go b/pkg/cmd/util/helpers.go index fe45d118..d9e401fa 100644 --- a/pkg/cmd/util/helpers.go +++ b/pkg/cmd/util/helpers.go @@ -430,6 +430,7 @@ const ( InteractiveDelete FeatureGate = "KUBECTL_INTERACTIVE_DELETE" OpenAPIV3Patch FeatureGate = "KUBECTL_OPENAPIV3_PATCH" RemoteCommandWebsockets FeatureGate = "KUBECTL_REMOTE_COMMAND_WEBSOCKETS" + PortForwardWebsockets FeatureGate = "KUBECTL_PORT_FORWARD_WEBSOCKETS" ) // IsEnabled returns true iff environment variable is set to true.