fix karmada can not execute exec command with proxy url and header
Signed-off-by: changzhen <changzhen5@huawei.com>
This commit is contained in:
parent
dd6a313add
commit
2e3c2e5e9b
4
go.mod
4
go.mod
|
@ -12,6 +12,7 @@ require (
|
|||
github.com/golang/mock v1.6.0
|
||||
github.com/google/go-cmp v0.5.9
|
||||
github.com/kr/pretty v0.3.1
|
||||
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f
|
||||
github.com/olekukonko/tablewriter v0.0.5
|
||||
github.com/onsi/ginkgo/v2 v2.11.0
|
||||
github.com/onsi/gomega v1.27.8
|
||||
|
@ -23,6 +24,7 @@ require (
|
|||
github.com/vektra/mockery/v2 v2.10.0
|
||||
github.com/yuin/gopher-lua v0.0.0-20220504180219-658193537a64
|
||||
go.uber.org/atomic v1.9.0
|
||||
golang.org/x/net v0.12.0
|
||||
golang.org/x/term v0.10.0
|
||||
golang.org/x/text v0.11.0
|
||||
golang.org/x/time v0.3.0
|
||||
|
@ -125,7 +127,6 @@ require (
|
|||
github.com/modern-go/reflect2 v1.0.2 // indirect
|
||||
github.com/monochromegane/go-gitignore v0.0.0-20200626010858-205db1a8cc00 // indirect
|
||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
|
||||
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect
|
||||
github.com/opencontainers/go-digest v1.0.0 // indirect
|
||||
github.com/pelletier/go-toml v1.9.4 // indirect
|
||||
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
|
||||
|
@ -166,7 +167,6 @@ require (
|
|||
go.uber.org/zap v1.24.0 // indirect
|
||||
golang.org/x/crypto v0.11.0 // indirect
|
||||
golang.org/x/mod v0.10.0 // indirect
|
||||
golang.org/x/net v0.12.0 // indirect
|
||||
golang.org/x/oauth2 v0.10.0 // indirect
|
||||
golang.org/x/sync v0.2.0 // indirect
|
||||
golang.org/x/sys v0.10.0 // indirect
|
||||
|
|
|
@ -8,27 +8,29 @@ import (
|
|||
"net/http"
|
||||
"net/url"
|
||||
"path"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
authenticationv1 "k8s.io/api/authentication/v1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/util/httpstream"
|
||||
utilnet "k8s.io/apimachinery/pkg/util/net"
|
||||
"k8s.io/apimachinery/pkg/util/proxy"
|
||||
proxyutil "k8s.io/apimachinery/pkg/util/proxy"
|
||||
"k8s.io/apiserver/pkg/authentication/user"
|
||||
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
|
||||
"k8s.io/apiserver/pkg/endpoints/request"
|
||||
"k8s.io/apiserver/pkg/registry/rest"
|
||||
registryrest "k8s.io/apiserver/pkg/registry/rest"
|
||||
|
||||
clusterapis "github.com/karmada-io/karmada/pkg/apis/cluster"
|
||||
)
|
||||
|
||||
// ConnectCluster returns a handler for proxy cluster.
|
||||
func ConnectCluster(ctx context.Context, cluster *clusterapis.Cluster, proxyPath string,
|
||||
secretGetter func(context.Context, string, string) (*corev1.Secret, error), responder rest.Responder) (http.Handler, error) {
|
||||
location, transport, err := Location(cluster)
|
||||
func ConnectCluster(ctx context.Context, cluster *clusterapis.Cluster, proxyPath string, secretGetter func(context.Context, string, string) (*corev1.Secret, error), responder registryrest.Responder) (http.Handler, error) {
|
||||
location, proxyTransport, err := Location(cluster)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
location.Path = path.Join(location.Path, proxyPath)
|
||||
|
||||
if cluster.Spec.ImpersonatorSecretRef == nil {
|
||||
|
@ -45,12 +47,50 @@ func ConnectCluster(ctx context.Context, cluster *clusterapis.Cluster, proxyPath
|
|||
return nil, fmt.Errorf("failed to get impresonateToken for cluster %s: %v", cluster.Name, err)
|
||||
}
|
||||
|
||||
return newProxyHandler(location, transport, impersonateToken, responder)
|
||||
return newProxyHandler(location, proxyTransport, cluster, impersonateToken, responder)
|
||||
}
|
||||
|
||||
func newProxyHandler(location *url.URL, proxyTransport http.RoundTripper, cluster *clusterapis.Cluster, impersonateToken string, responder registryrest.Responder) (http.Handler, error) {
|
||||
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
||||
requester, exist := request.UserFrom(req.Context())
|
||||
if !exist {
|
||||
responsewriters.InternalError(rw, req, errors.New("no user found for request"))
|
||||
return
|
||||
}
|
||||
|
||||
req.Header.Set(authenticationv1.ImpersonateUserHeader, requester.GetName())
|
||||
for _, group := range requester.GetGroups() {
|
||||
if !skipGroup(group) {
|
||||
req.Header.Add(authenticationv1.ImpersonateGroupHeader, group)
|
||||
}
|
||||
}
|
||||
req.Header.Set("Authorization", fmt.Sprintf("bearer %s", impersonateToken))
|
||||
|
||||
var proxyURL *url.URL
|
||||
if proxyURLStr := cluster.Spec.ProxyURL; proxyURLStr != "" {
|
||||
proxyURL, _ = url.Parse(proxyURLStr)
|
||||
}
|
||||
|
||||
// Retain RawQuery in location because upgrading the request will use it.
|
||||
// See https://github.com/karmada-io/karmada/issues/1618#issuecomment-1103793290 for more info.
|
||||
location.RawQuery = req.URL.RawQuery
|
||||
|
||||
upgradeDialer := NewUpgradeDialerWithConfig(UpgradeDialerWithConfig{
|
||||
TLS: &tls.Config{InsecureSkipVerify: true}, //nolint:gosec
|
||||
Proxier: http.ProxyURL(proxyURL),
|
||||
PingPeriod: time.Second * 5,
|
||||
Header: ParseProxyHeaders(cluster.Spec.ProxyHeader),
|
||||
})
|
||||
|
||||
handler := NewUpgradeAwareHandler(location, proxyTransport, false, httpstream.IsUpgradeRequest(req), proxyutil.NewErrorResponder(responder))
|
||||
handler.UpgradeDialer = upgradeDialer
|
||||
handler.ServeHTTP(rw, req)
|
||||
}), nil
|
||||
}
|
||||
|
||||
// NewThrottledUpgradeAwareProxyHandler creates a new proxy handler with a default flush interval. Responder is required for returning
|
||||
// errors to the caller.
|
||||
func NewThrottledUpgradeAwareProxyHandler(location *url.URL, transport http.RoundTripper, wrapTransport, upgradeRequired bool, responder rest.Responder) *proxy.UpgradeAwareHandler {
|
||||
func NewThrottledUpgradeAwareProxyHandler(location *url.URL, transport http.RoundTripper, wrapTransport, upgradeRequired bool, responder registryrest.Responder) *proxy.UpgradeAwareHandler {
|
||||
return proxy.NewUpgradeAwareHandler(location, transport, wrapTransport, upgradeRequired, proxy.NewErrorResponder(responder))
|
||||
}
|
||||
|
||||
|
@ -61,12 +101,12 @@ func Location(cluster *clusterapis.Cluster) (*url.URL, http.RoundTripper, error)
|
|||
return nil, nil, err
|
||||
}
|
||||
|
||||
transport, err := createProxyTransport(cluster)
|
||||
proxyTransport, err := createProxyTransport(cluster)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return location, transport, nil
|
||||
return location, proxyTransport, nil
|
||||
}
|
||||
|
||||
func constructLocation(cluster *clusterapis.Cluster) (*url.URL, error) {
|
||||
|
@ -96,10 +136,24 @@ func createProxyTransport(cluster *clusterapis.Cluster) (*http.Transport, error)
|
|||
return nil, fmt.Errorf("failed to parse url of proxy url %s: %v", proxyURL, err)
|
||||
}
|
||||
trans.Proxy = http.ProxyURL(u)
|
||||
trans.ProxyConnectHeader = ParseProxyHeaders(cluster.Spec.ProxyHeader)
|
||||
}
|
||||
return trans, nil
|
||||
}
|
||||
|
||||
func ParseProxyHeaders(proxyHeaders map[string]string) http.Header {
|
||||
if len(proxyHeaders) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
header := http.Header{}
|
||||
for headerKey, headerValues := range proxyHeaders {
|
||||
values := strings.Split(headerValues, ",")
|
||||
header[headerKey] = values
|
||||
}
|
||||
return header
|
||||
}
|
||||
|
||||
func getImpersonateToken(clusterName string, secret *corev1.Secret) (string, error) {
|
||||
token, found := secret.Data[clusterapis.SecretTokenKey]
|
||||
if !found {
|
||||
|
@ -108,31 +162,6 @@ func getImpersonateToken(clusterName string, secret *corev1.Secret) (string, err
|
|||
return string(token), nil
|
||||
}
|
||||
|
||||
func newProxyHandler(location *url.URL, transport http.RoundTripper, impersonateToken string, responder rest.Responder) (http.Handler, error) {
|
||||
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
||||
requester, exist := request.UserFrom(req.Context())
|
||||
if !exist {
|
||||
responsewriters.InternalError(rw, req, errors.New("no user found for request"))
|
||||
return
|
||||
}
|
||||
req.Header.Set(authenticationv1.ImpersonateUserHeader, requester.GetName())
|
||||
for _, group := range requester.GetGroups() {
|
||||
if !skipGroup(group) {
|
||||
req.Header.Add(authenticationv1.ImpersonateGroupHeader, group)
|
||||
}
|
||||
}
|
||||
|
||||
req.Header.Set("Authorization", fmt.Sprintf("bearer %s", impersonateToken))
|
||||
|
||||
// Retain RawQuery in location because upgrading the request will use it.
|
||||
// See https://github.com/karmada-io/karmada/issues/1618#issuecomment-1103793290 for more info.
|
||||
location.RawQuery = req.URL.RawQuery
|
||||
|
||||
handler := NewThrottledUpgradeAwareProxyHandler(location, transport, true, false, responder)
|
||||
handler.ServeHTTP(rw, req)
|
||||
}), nil
|
||||
}
|
||||
|
||||
func skipGroup(group string) bool {
|
||||
switch group {
|
||||
case user.AllAuthenticated, user.AllUnauthenticated:
|
||||
|
|
|
@ -0,0 +1,271 @@
|
|||
/*
|
||||
Copyright 2015 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"encoding/base64"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/httputil"
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/proxy"
|
||||
utilnet "k8s.io/apimachinery/pkg/util/net"
|
||||
"k8s.io/apimachinery/third_party/forked/golang/netutil"
|
||||
)
|
||||
|
||||
// UpgradeDialer knows how to upgrade an HTTP request to one that supports
|
||||
// multiplexed streams. After Dial() is invoked, Conn will be usable.
|
||||
type UpgradeDialer struct {
|
||||
//tlsConfig holds the TLS configuration settings to use when connecting
|
||||
//to the remote server.
|
||||
tlsConfig *tls.Config
|
||||
|
||||
// Dialer is the dialer used to connect. Used if non-nil.
|
||||
Dialer *net.Dialer
|
||||
|
||||
// header holds the HTTP request headers for dialing.
|
||||
header http.Header
|
||||
|
||||
// proxier knows which proxy to use given a request.
|
||||
proxier func(req *http.Request) (*url.URL, error)
|
||||
|
||||
// pingPeriod is a period for sending Ping frames over established
|
||||
// connections.
|
||||
pingPeriod time.Duration
|
||||
}
|
||||
|
||||
var _ utilnet.TLSClientConfigHolder = &UpgradeDialer{}
|
||||
var _ utilnet.Dialer = &UpgradeDialer{}
|
||||
|
||||
// NewUpgradeDialerWithConfig creates a new UpgradeRoundTripper with the specified
|
||||
// configuration.
|
||||
func NewUpgradeDialerWithConfig(cfg UpgradeDialerWithConfig) *UpgradeDialer {
|
||||
if cfg.Proxier == nil {
|
||||
cfg.Proxier = utilnet.NewProxierWithNoProxyCIDR(http.ProxyFromEnvironment)
|
||||
}
|
||||
return &UpgradeDialer{
|
||||
tlsConfig: cfg.TLS,
|
||||
proxier: cfg.Proxier,
|
||||
pingPeriod: cfg.PingPeriod,
|
||||
header: cfg.Header,
|
||||
}
|
||||
}
|
||||
|
||||
// UpgradeDialerWithConfig is a set of options for an UpgradeDialer.
|
||||
type UpgradeDialerWithConfig struct {
|
||||
// TLS configuration used by the round tripper.
|
||||
TLS *tls.Config
|
||||
// Header holds the HTTP request headers for dialing. Optional.
|
||||
Header http.Header
|
||||
// Proxier is a proxy function invoked on each request. Optional.
|
||||
Proxier func(*http.Request) (*url.URL, error)
|
||||
// PingPeriod is a period for sending Pings on the connection.
|
||||
// Optional.
|
||||
PingPeriod time.Duration
|
||||
}
|
||||
|
||||
// TLSClientConfig implements pkg/util/net.TLSClientConfigHolder for proper TLS checking during
|
||||
// proxying with a roundtripper.
|
||||
func (u *UpgradeDialer) TLSClientConfig() *tls.Config {
|
||||
return u.tlsConfig
|
||||
}
|
||||
|
||||
// Dial implements k8s.io/apimachinery/pkg/util/net.Dialer.
|
||||
func (u *UpgradeDialer) Dial(req *http.Request) (net.Conn, error) {
|
||||
conn, err := u.dial(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := req.Write(conn); err != nil {
|
||||
conn.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
// dial dials the host specified by req, using TLS if appropriate.
|
||||
func (u *UpgradeDialer) dial(req *http.Request) (net.Conn, error) {
|
||||
proxyURL, err := u.proxier(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if proxyURL == nil {
|
||||
return u.dialWithoutProxy(req.Context(), req.URL)
|
||||
}
|
||||
|
||||
switch proxyURL.Scheme {
|
||||
case "socks5":
|
||||
return u.dialWithSocks5Proxy(req, proxyURL)
|
||||
case "https", "http", "":
|
||||
return u.dialWithHttpProxy(req, proxyURL)
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("proxy URL scheme not supported: %s", proxyURL.Scheme)
|
||||
}
|
||||
|
||||
// dialWithHttpProxy dials the host specified by url through an http or an https proxy.
|
||||
func (u *UpgradeDialer) dialWithHttpProxy(req *http.Request, proxyURL *url.URL) (net.Conn, error) {
|
||||
// ensure we use a canonical host with proxyReq
|
||||
targetHost := netutil.CanonicalAddr(req.URL)
|
||||
|
||||
// proxying logic adapted from http://blog.h6t.eu/post/74098062923/golang-websocket-with-http-proxy-support
|
||||
proxyReq := http.Request{
|
||||
Method: "CONNECT",
|
||||
URL: &url.URL{},
|
||||
Host: targetHost,
|
||||
Header: u.header,
|
||||
}
|
||||
|
||||
proxyReq = *proxyReq.WithContext(req.Context())
|
||||
|
||||
if pa := u.proxyAuth(proxyURL); pa != "" {
|
||||
proxyReq.Header.Set("Proxy-Authorization", pa)
|
||||
}
|
||||
|
||||
proxyDialConn, err := u.dialWithoutProxy(proxyReq.Context(), proxyURL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
//nolint:staticcheck // SA1019 ignore deprecated httputil.NewProxyClientConn
|
||||
proxyClientConn := httputil.NewProxyClientConn(proxyDialConn, nil)
|
||||
response, err := proxyClientConn.Do(&proxyReq)
|
||||
//nolint:staticcheck // SA1019 ignore deprecated httputil.ErrPersistEOF: it might be
|
||||
// returned from the invocation of proxyClientConn.Do
|
||||
if err != nil && err != httputil.ErrPersistEOF {
|
||||
return nil, err
|
||||
}
|
||||
if response != nil && response.StatusCode >= 300 || response.StatusCode < 200 {
|
||||
return nil, fmt.Errorf("CONNECT request to %s returned response: %s", proxyURL.Redacted(), response.Status)
|
||||
}
|
||||
|
||||
rwc, _ := proxyClientConn.Hijack()
|
||||
|
||||
if req.URL.Scheme == "https" {
|
||||
return u.tlsConn(proxyReq.Context(), rwc, targetHost)
|
||||
}
|
||||
return rwc, nil
|
||||
}
|
||||
|
||||
// dialWithSocks5Proxy dials the host specified by url through a socks5 proxy.
|
||||
func (u *UpgradeDialer) dialWithSocks5Proxy(req *http.Request, proxyURL *url.URL) (net.Conn, error) {
|
||||
// ensure we use a canonical host with proxyReq
|
||||
targetHost := netutil.CanonicalAddr(req.URL)
|
||||
proxyDialAddr := netutil.CanonicalAddr(proxyURL)
|
||||
|
||||
var auth *proxy.Auth
|
||||
if proxyURL.User != nil {
|
||||
pass, _ := proxyURL.User.Password()
|
||||
auth = &proxy.Auth{
|
||||
User: proxyURL.User.Username(),
|
||||
Password: pass,
|
||||
}
|
||||
}
|
||||
|
||||
dialer := u.Dialer
|
||||
if dialer == nil {
|
||||
dialer = &net.Dialer{
|
||||
Timeout: 30 * time.Second,
|
||||
}
|
||||
}
|
||||
|
||||
proxyDialer, err := proxy.SOCKS5("tcp", proxyDialAddr, auth, dialer)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// According to the implementation of proxy.SOCKS5, the type assertion will always succeed
|
||||
contextDialer, ok := proxyDialer.(proxy.ContextDialer)
|
||||
if !ok {
|
||||
return nil, errors.New("SOCKS5 Dialer must implement ContextDialer")
|
||||
}
|
||||
|
||||
proxyDialConn, err := contextDialer.DialContext(req.Context(), "tcp", targetHost)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if req.URL.Scheme == "https" {
|
||||
return u.tlsConn(req.Context(), proxyDialConn, targetHost)
|
||||
}
|
||||
return proxyDialConn, nil
|
||||
}
|
||||
|
||||
// tlsConn returns a TLS client side connection using rwc as the underlying transport.
|
||||
func (u *UpgradeDialer) tlsConn(ctx context.Context, rwc net.Conn, targetHost string) (net.Conn, error) {
|
||||
host, _, err := net.SplitHostPort(targetHost)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
tlsConfig := u.tlsConfig
|
||||
switch {
|
||||
case tlsConfig == nil:
|
||||
tlsConfig = &tls.Config{ServerName: host, MinVersion: tls.VersionTLS12}
|
||||
case len(tlsConfig.ServerName) == 0:
|
||||
tlsConfig = tlsConfig.Clone()
|
||||
tlsConfig.ServerName = host
|
||||
}
|
||||
|
||||
tlsConn := tls.Client(rwc, tlsConfig)
|
||||
|
||||
if err := tlsConn.HandshakeContext(ctx); err != nil {
|
||||
tlsConn.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return tlsConn, nil
|
||||
}
|
||||
|
||||
// dialWithoutProxy dials the host specified by url, using TLS if appropriate.
|
||||
func (u *UpgradeDialer) dialWithoutProxy(ctx context.Context, url *url.URL) (net.Conn, error) {
|
||||
dialAddr := netutil.CanonicalAddr(url)
|
||||
dialer := u.Dialer
|
||||
if dialer == nil {
|
||||
dialer = &net.Dialer{}
|
||||
}
|
||||
|
||||
if url.Scheme == "http" {
|
||||
return dialer.DialContext(ctx, "tcp", dialAddr)
|
||||
}
|
||||
|
||||
tlsDialer := tls.Dialer{
|
||||
NetDialer: dialer,
|
||||
Config: u.tlsConfig,
|
||||
}
|
||||
return tlsDialer.DialContext(ctx, "tcp", dialAddr)
|
||||
}
|
||||
|
||||
// proxyAuth returns, for a given proxy URL, the value to be used for the Proxy-Authorization header
|
||||
func (u *UpgradeDialer) proxyAuth(proxyURL *url.URL) string {
|
||||
if proxyURL == nil || proxyURL.User == nil {
|
||||
return ""
|
||||
}
|
||||
username := proxyURL.User.Username()
|
||||
password, _ := proxyURL.User.Password()
|
||||
auth := username + ":" + password
|
||||
return "Basic " + base64.StdEncoding.EncodeToString([]byte(auth))
|
||||
}
|
|
@ -0,0 +1,419 @@
|
|||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/httputil"
|
||||
"net/url"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/mxk/go-flowrate/flowrate"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/util/httpstream"
|
||||
utilnet "k8s.io/apimachinery/pkg/util/net"
|
||||
proxyutil "k8s.io/apimachinery/pkg/util/proxy"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
type UpgradeAwareHandler struct {
|
||||
proxyutil.UpgradeAwareHandler
|
||||
|
||||
UpgradeDialer *UpgradeDialer
|
||||
}
|
||||
|
||||
const defaultFlushInterval = 200 * time.Millisecond
|
||||
|
||||
// normalizeLocation returns the result of parsing the full URL, with scheme set to http if missing
|
||||
func normalizeLocation(location *url.URL) *url.URL {
|
||||
normalized, _ := url.Parse(location.String())
|
||||
if len(normalized.Scheme) == 0 {
|
||||
normalized.Scheme = "http"
|
||||
}
|
||||
return normalized
|
||||
}
|
||||
|
||||
// NewUpgradeAwareHandler creates a new proxy handler with a default flush interval. Responder is required for returning
|
||||
// errors to the caller.
|
||||
func NewUpgradeAwareHandler(location *url.URL, transport http.RoundTripper, wrapTransport, upgradeRequired bool, responder proxyutil.ErrorResponder) *UpgradeAwareHandler {
|
||||
return &UpgradeAwareHandler{
|
||||
UpgradeAwareHandler: proxyutil.UpgradeAwareHandler{
|
||||
Location: normalizeLocation(location),
|
||||
Transport: transport,
|
||||
WrapTransport: wrapTransport,
|
||||
UpgradeRequired: upgradeRequired,
|
||||
FlushInterval: defaultFlushInterval,
|
||||
Responder: responder,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func proxyRedirectsforRootPath(path string, w http.ResponseWriter, req *http.Request) bool {
|
||||
redirect := false
|
||||
method := req.Method
|
||||
|
||||
// From pkg/genericapiserver/endpoints/handlers/proxy.go#ServeHTTP:
|
||||
// Redirect requests with an empty path to a location that ends with a '/'
|
||||
// This is essentially a hack for https://issue.k8s.io/4958.
|
||||
// Note: Keep this code after tryUpgrade to not break that flow.
|
||||
if len(path) == 0 && (method == http.MethodGet || method == http.MethodHead) {
|
||||
var queryPart string
|
||||
if len(req.URL.RawQuery) > 0 {
|
||||
queryPart = "?" + req.URL.RawQuery
|
||||
}
|
||||
w.Header().Set("Location", req.URL.Path+"/"+queryPart)
|
||||
w.WriteHeader(http.StatusMovedPermanently)
|
||||
redirect = true
|
||||
}
|
||||
return redirect
|
||||
}
|
||||
|
||||
// ServeHTTP handles the proxy request
|
||||
//
|
||||
//nolint:gocyclo
|
||||
func (h *UpgradeAwareHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
if h.tryUpgrade(w, req) {
|
||||
return
|
||||
}
|
||||
if h.UpgradeRequired {
|
||||
h.Responder.Error(w, req, apierrors.NewBadRequest("Upgrade request required"))
|
||||
return
|
||||
}
|
||||
|
||||
loc := *h.Location
|
||||
loc.RawQuery = req.URL.RawQuery
|
||||
|
||||
// If original request URL ended in '/', append a '/' at the end of the
|
||||
// of the proxy URL
|
||||
if !strings.HasSuffix(loc.Path, "/") && strings.HasSuffix(req.URL.Path, "/") {
|
||||
loc.Path += "/"
|
||||
}
|
||||
|
||||
proxyRedirect := proxyRedirectsforRootPath(loc.Path, w, req)
|
||||
if proxyRedirect {
|
||||
return
|
||||
}
|
||||
|
||||
if h.Transport == nil || h.WrapTransport {
|
||||
h.Transport = h.defaultProxyTransport(req.URL, h.Transport)
|
||||
}
|
||||
|
||||
// WithContext creates a shallow clone of the request with the same context.
|
||||
newReq := req.WithContext(req.Context())
|
||||
newReq.Header = utilnet.CloneHeader(req.Header)
|
||||
if !h.UseRequestLocation {
|
||||
newReq.URL = &loc
|
||||
}
|
||||
if h.UseLocationHost {
|
||||
// exchanging req.Host with the backend location is necessary for backends that act on the HTTP host header (e.g. API gateways),
|
||||
// because req.Host has preference over req.URL.Host in filling this header field
|
||||
newReq.Host = h.Location.Host
|
||||
}
|
||||
|
||||
// create the target location to use for the reverse proxy
|
||||
reverseProxyLocation := &url.URL{Scheme: h.Location.Scheme, Host: h.Location.Host}
|
||||
if h.AppendLocationPath {
|
||||
reverseProxyLocation.Path = h.Location.Path
|
||||
}
|
||||
|
||||
proxy := httputil.NewSingleHostReverseProxy(reverseProxyLocation)
|
||||
proxy.Transport = h.Transport
|
||||
proxy.FlushInterval = h.FlushInterval
|
||||
proxy.ErrorLog = log.New(noSuppressPanicError{}, "", log.LstdFlags)
|
||||
if h.RejectForwardingRedirects {
|
||||
oldModifyResponse := proxy.ModifyResponse
|
||||
proxy.ModifyResponse = func(response *http.Response) error {
|
||||
code := response.StatusCode
|
||||
if code >= 300 && code <= 399 && len(response.Header.Get("Location")) > 0 {
|
||||
// close the original response
|
||||
response.Body.Close()
|
||||
msg := "the backend attempted to redirect this request, which is not permitted"
|
||||
// replace the response
|
||||
*response = http.Response{
|
||||
StatusCode: http.StatusBadGateway,
|
||||
Status: fmt.Sprintf("%d %s", response.StatusCode, http.StatusText(response.StatusCode)),
|
||||
Body: io.NopCloser(strings.NewReader(msg)),
|
||||
ContentLength: int64(len(msg)),
|
||||
}
|
||||
} else {
|
||||
if oldModifyResponse != nil {
|
||||
if err := oldModifyResponse(response); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
if h.Responder != nil {
|
||||
// if an optional error interceptor/responder was provided wire it
|
||||
// the custom responder might be used for providing a unified error reporting
|
||||
// or supporting retry mechanisms by not sending non-fatal errors to the clients
|
||||
proxy.ErrorHandler = h.Responder.Error
|
||||
}
|
||||
proxy.ServeHTTP(w, newReq)
|
||||
}
|
||||
|
||||
type noSuppressPanicError struct{}
|
||||
|
||||
func (noSuppressPanicError) Write(p []byte) (n int, err error) {
|
||||
// skip "suppressing panic for copyResponse error in test; copy error" error message
|
||||
// that ends up in CI tests on each kube-apiserver termination as noise and
|
||||
// everybody thinks this is fatal.
|
||||
if strings.Contains(string(p), "suppressing panic") {
|
||||
return len(p), nil
|
||||
}
|
||||
return os.Stderr.Write(p)
|
||||
}
|
||||
|
||||
// tryUpgrade returns true if the request was handled.
|
||||
//
|
||||
//nolint:gocyclo
|
||||
func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Request) bool {
|
||||
if !httpstream.IsUpgradeRequest(req) {
|
||||
klog.V(6).Infof("Request was not an upgrade")
|
||||
return false
|
||||
}
|
||||
|
||||
var (
|
||||
backendConn net.Conn
|
||||
rawResponse []byte
|
||||
err error
|
||||
)
|
||||
|
||||
location := *h.Location
|
||||
if h.UseRequestLocation {
|
||||
location = *req.URL
|
||||
location.Scheme = h.Location.Scheme
|
||||
location.Host = h.Location.Host
|
||||
if h.AppendLocationPath {
|
||||
location.Path = singleJoiningSlash(h.Location.Path, location.Path)
|
||||
}
|
||||
}
|
||||
|
||||
clone := utilnet.CloneRequest(req)
|
||||
// Only append X-Forwarded-For in the upgrade path, since httputil.NewSingleHostReverseProxy
|
||||
// handles this in the non-upgrade path.
|
||||
utilnet.AppendForwardedForHeader(clone)
|
||||
klog.V(6).Infof("Connecting to backend proxy (direct dial) %s\n Headers: %v", &location, clone.Header)
|
||||
if h.UseLocationHost {
|
||||
clone.Host = h.Location.Host
|
||||
}
|
||||
clone.URL = &location
|
||||
|
||||
//backendConn, err = h.DialForUpgrade(clone)
|
||||
backendConn, err = h.UpgradeDialer.Dial(clone)
|
||||
if err != nil {
|
||||
klog.V(6).Infof("Proxy connection error: %v", err)
|
||||
h.Responder.Error(w, req, err)
|
||||
return true
|
||||
}
|
||||
defer backendConn.Close()
|
||||
|
||||
// determine the http response code from the backend by reading from rawResponse+backendConn
|
||||
backendHTTPResponse, headerBytes, err := getResponse(io.MultiReader(bytes.NewReader(rawResponse), backendConn))
|
||||
if err != nil {
|
||||
klog.V(6).Infof("Proxy connection error: %v", err)
|
||||
h.Responder.Error(w, req, err)
|
||||
return true
|
||||
}
|
||||
if len(headerBytes) > len(rawResponse) {
|
||||
// we read beyond the bytes stored in rawResponse, update rawResponse to the full set of bytes read from the backend
|
||||
rawResponse = headerBytes
|
||||
}
|
||||
|
||||
// If the backend did not upgrade the request, return an error to the client. If the response was
|
||||
// an error, the error is forwarded directly after the connection is hijacked. Otherwise, just
|
||||
// return a generic error here.
|
||||
if backendHTTPResponse.StatusCode != http.StatusSwitchingProtocols && backendHTTPResponse.StatusCode < 400 {
|
||||
err := fmt.Errorf("invalid upgrade response: status code %d", backendHTTPResponse.StatusCode)
|
||||
klog.Errorf("Proxy upgrade error: %v", err)
|
||||
h.Responder.Error(w, req, err)
|
||||
return true
|
||||
}
|
||||
|
||||
// Once the connection is hijacked, the ErrorResponder will no longer work, so
|
||||
// hijacking should be the last step in the upgrade.
|
||||
requestHijacker, ok := w.(http.Hijacker)
|
||||
if !ok {
|
||||
klog.V(6).Infof("Unable to hijack response writer: %T", w)
|
||||
h.Responder.Error(w, req, fmt.Errorf("request connection cannot be hijacked: %T", w))
|
||||
return true
|
||||
}
|
||||
requestHijackedConn, _, err := requestHijacker.Hijack()
|
||||
if err != nil {
|
||||
klog.V(6).Infof("Unable to hijack response: %v", err)
|
||||
h.Responder.Error(w, req, fmt.Errorf("error hijacking connection: %v", err))
|
||||
return true
|
||||
}
|
||||
defer requestHijackedConn.Close()
|
||||
|
||||
if backendHTTPResponse.StatusCode != http.StatusSwitchingProtocols {
|
||||
// If the backend did not upgrade the request, echo the response from the backend to the client and return, closing the connection.
|
||||
klog.V(6).Infof("Proxy upgrade error, status code %d", backendHTTPResponse.StatusCode)
|
||||
// set read/write deadlines
|
||||
deadline := time.Now().Add(10 * time.Second)
|
||||
_ = backendConn.SetReadDeadline(deadline)
|
||||
_ = requestHijackedConn.SetWriteDeadline(deadline)
|
||||
// write the response to the client
|
||||
err := backendHTTPResponse.Write(requestHijackedConn)
|
||||
if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
|
||||
klog.Errorf("Error proxying data from backend to client: %v", err)
|
||||
}
|
||||
// Indicate we handled the request
|
||||
return true
|
||||
}
|
||||
|
||||
// Forward raw response bytes back to client.
|
||||
if len(rawResponse) > 0 {
|
||||
klog.V(6).Infof("Writing %d bytes to hijacked connection", len(rawResponse))
|
||||
if _, err = requestHijackedConn.Write(rawResponse); err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("Error proxying response from backend to client: %v", err))
|
||||
}
|
||||
}
|
||||
|
||||
// Proxy the connection. This is bidirectional, so we need a goroutine
|
||||
// to copy in each direction. Once one side of the connection exits, we
|
||||
// exit the function which performs cleanup and in the process closes
|
||||
// the other half of the connection in the defer.
|
||||
writerComplete := make(chan struct{})
|
||||
readerComplete := make(chan struct{})
|
||||
|
||||
go func() {
|
||||
var writer io.WriteCloser
|
||||
if h.MaxBytesPerSec > 0 {
|
||||
writer = flowrate.NewWriter(backendConn, h.MaxBytesPerSec)
|
||||
} else {
|
||||
writer = backendConn
|
||||
}
|
||||
_, err := io.Copy(writer, requestHijackedConn)
|
||||
if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
|
||||
klog.Errorf("Error proxying data from client to backend: %v", err)
|
||||
}
|
||||
close(writerComplete)
|
||||
}()
|
||||
|
||||
go func() {
|
||||
var reader io.ReadCloser
|
||||
if h.MaxBytesPerSec > 0 {
|
||||
reader = flowrate.NewReader(backendConn, h.MaxBytesPerSec)
|
||||
} else {
|
||||
reader = backendConn
|
||||
}
|
||||
_, err := io.Copy(requestHijackedConn, reader)
|
||||
if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
|
||||
klog.Errorf("Error proxying data from backend to client: %v", err)
|
||||
}
|
||||
close(readerComplete)
|
||||
}()
|
||||
|
||||
// Wait for one half the connection to exit. Once it does the defer will
|
||||
// clean up the other half of the connection.
|
||||
select {
|
||||
case <-writerComplete:
|
||||
case <-readerComplete:
|
||||
}
|
||||
klog.V(6).Infof("Disconnecting from backend proxy %s\n Headers: %v", &location, clone.Header)
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// FIXME: Taken from net/http/httputil/reverseproxy.go as singleJoiningSlash is not exported to be re-used.
|
||||
// See-also: https://github.com/golang/go/issues/44290
|
||||
func singleJoiningSlash(a, b string) string {
|
||||
aslash := strings.HasSuffix(a, "/")
|
||||
bslash := strings.HasPrefix(b, "/")
|
||||
switch {
|
||||
case aslash && bslash:
|
||||
return a + b[1:]
|
||||
case !aslash && !bslash:
|
||||
return a + "/" + b
|
||||
}
|
||||
return a + b
|
||||
}
|
||||
|
||||
// getResponseCode reads a http response from the given reader, returns the response,
|
||||
// the bytes read from the reader, and any error encountered
|
||||
func getResponse(r io.Reader) (*http.Response, []byte, error) {
|
||||
rawResponse := bytes.NewBuffer(make([]byte, 0, 256))
|
||||
// Save the bytes read while reading the response headers into the rawResponse buffer
|
||||
resp, err := http.ReadResponse(bufio.NewReader(io.TeeReader(r, rawResponse)), nil)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
// return the http response and the raw bytes consumed from the reader in the process
|
||||
return resp, rawResponse.Bytes(), nil
|
||||
}
|
||||
|
||||
func (h *UpgradeAwareHandler) defaultProxyTransport(url *url.URL, internalTransport http.RoundTripper) http.RoundTripper {
|
||||
scheme := url.Scheme
|
||||
host := url.Host
|
||||
suffix := h.Location.Path
|
||||
if strings.HasSuffix(url.Path, "/") && !strings.HasSuffix(suffix, "/") {
|
||||
suffix += "/"
|
||||
}
|
||||
pathPrepend := strings.TrimSuffix(url.Path, suffix)
|
||||
rewritingTransport := &proxyutil.Transport{
|
||||
Scheme: scheme,
|
||||
Host: host,
|
||||
PathPrepend: pathPrepend,
|
||||
RoundTripper: internalTransport,
|
||||
}
|
||||
return &corsRemovingTransport{
|
||||
RoundTripper: rewritingTransport,
|
||||
}
|
||||
}
|
||||
|
||||
// corsRemovingTransport is a wrapper for an internal transport. It removes CORS headers
|
||||
// from the internal response.
|
||||
// Implements pkg/util/net.RoundTripperWrapper
|
||||
type corsRemovingTransport struct {
|
||||
http.RoundTripper
|
||||
}
|
||||
|
||||
var _ = utilnet.RoundTripperWrapper(&corsRemovingTransport{})
|
||||
|
||||
func (rt *corsRemovingTransport) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
resp, err := rt.RoundTripper.RoundTrip(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
removeCORSHeaders(resp)
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (rt *corsRemovingTransport) WrappedRoundTripper() http.RoundTripper {
|
||||
return rt.RoundTripper
|
||||
}
|
||||
|
||||
// removeCORSHeaders strip CORS headers sent from the backend
|
||||
// This should be called on all responses before returning
|
||||
func removeCORSHeaders(resp *http.Response) {
|
||||
resp.Header.Del("Access-Control-Allow-Credentials")
|
||||
resp.Header.Del("Access-Control-Allow-Headers")
|
||||
resp.Header.Del("Access-Control-Allow-Methods")
|
||||
resp.Header.Del("Access-Control-Allow-Origin")
|
||||
}
|
Loading…
Reference in New Issue