portforward: tunnel spdy through websockets

Kubernetes-commit: 8b447d8c97e8823b4308eb91cf7d75693e867c61
This commit is contained in:
Sean Sullivan 2024-02-21 08:56:07 +00:00 committed by Kubernetes Publisher
parent ecc9600ea2
commit 858d3a5b72
3 changed files with 36 additions and 9 deletions

View File

@ -31,6 +31,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/cli-runtime/pkg/genericiooptions"
"k8s.io/client-go/kubernetes/scheme"
@ -50,7 +51,7 @@ import (
type PortForwardOptions struct {
Namespace string
PodName string
RESTClient *restclient.RESTClient
RESTClient restclient.Interface
Config *restclient.Config
PodClient corev1client.PodsGetter
Address []string
@ -99,11 +100,7 @@ const (
)
func NewCmdPortForward(f cmdutil.Factory, streams genericiooptions.IOStreams) *cobra.Command {
opts := &PortForwardOptions{
PortForwarder: &defaultPortForwarder{
IOStreams: streams,
},
}
opts := NewDefaultPortForwardOptions(streams)
cmd := &cobra.Command{
Use: "port-forward TYPE/NAME [options] [LOCAL_PORT:]REMOTE_PORT [...[LOCAL_PORT_N:]REMOTE_PORT_N]",
DisableFlagsInUseLine: true,
@ -123,6 +120,14 @@ func NewCmdPortForward(f cmdutil.Factory, streams genericiooptions.IOStreams) *c
return cmd
}
func NewDefaultPortForwardOptions(streams genericiooptions.IOStreams) *PortForwardOptions {
return &PortForwardOptions{
PortForwarder: &defaultPortForwarder{
IOStreams: streams,
},
}
}
type portForwarder interface {
ForwardPorts(method string, url *url.URL, opts PortForwardOptions) error
}
@ -137,6 +142,14 @@ func (f *defaultPortForwarder) ForwardPorts(method string, url *url.URL, opts Po
return err
}
dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, method, url)
if cmdutil.PortForwardWebsockets.IsEnabled() {
tunnelingDialer, err := portforward.NewSPDYOverWebsocketDialer(url, opts.Config)
if err != nil {
return err
}
// First attempt tunneling (websocket) dialer, then fallback to spdy dialer.
dialer = portforward.NewFallbackDialer(tunnelingDialer, dialer, httpstream.IsUpgradeFailure)
}
fw, err := portforward.NewOnAddresses(dialer, opts.Address, opts.Ports, opts.StopChannel, opts.ReadyChannel, f.Out, f.ErrOut)
if err != nil {
return err
@ -387,7 +400,14 @@ func (o PortForwardOptions) Validate() error {
// RunPortForward implements all the necessary functionality for port-forward cmd.
func (o PortForwardOptions) RunPortForward() error {
pod, err := o.PodClient.Pods(o.Namespace).Get(context.TODO(), o.PodName, metav1.GetOptions{})
return o.RunPortForwardContext(context.Background())
}
// RunPortForwardContext implements all the necessary functionality for port-forward cmd.
// It ends portforwarding when an error is received from the backend, or an os.Interrupt
// signal is received, or the provided context is done.
func (o PortForwardOptions) RunPortForwardContext(ctx context.Context) error {
pod, err := o.PodClient.Pods(o.Namespace).Get(ctx, o.PodName, metav1.GetOptions{})
if err != nil {
return err
}
@ -401,7 +421,10 @@ func (o PortForwardOptions) RunPortForward() error {
defer signal.Stop(signals)
go func() {
<-signals
select {
case <-signals:
case <-ctx.Done():
}
if o.StopChannel != nil {
close(o.StopChannel)
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package portforward
import (
"context"
"fmt"
"net/http"
"net/url"
@ -101,6 +102,8 @@ func testPortForward(t *testing.T, flags map[string]string, args []string) {
}
opts := &PortForwardOptions{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cmd := NewCmdPortForward(tf, genericiooptions.NewTestIOStreamsDiscard())
cmd.Run = func(cmd *cobra.Command, args []string) {
if err = opts.Complete(tf, cmd, args); err != nil {
@ -110,7 +113,7 @@ func testPortForward(t *testing.T, flags map[string]string, args []string) {
if err = opts.Validate(); err != nil {
return
}
err = opts.RunPortForward()
err = opts.RunPortForwardContext(ctx)
}
for name, value := range flags {

View File

@ -430,6 +430,7 @@ const (
InteractiveDelete FeatureGate = "KUBECTL_INTERACTIVE_DELETE"
OpenAPIV3Patch FeatureGate = "KUBECTL_OPENAPIV3_PATCH"
RemoteCommandWebsockets FeatureGate = "KUBECTL_REMOTE_COMMAND_WEBSOCKETS"
PortForwardWebsockets FeatureGate = "KUBECTL_PORT_FORWARD_WEBSOCKETS"
)
// IsEnabled returns true iff environment variable is set to true.