421 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			421 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
| /*
 | |
| Copyright 2017 The Kubernetes Authors.
 | |
| 
 | |
| Licensed under the Apache License, Version 2.0 (the "License");
 | |
| you may not use this file except in compliance with the License.
 | |
| You may obtain a copy of the License at
 | |
| 
 | |
|     http://www.apache.org/licenses/LICENSE-2.0
 | |
| 
 | |
| Unless required by applicable law or agreed to in writing, software
 | |
| distributed under the License is distributed on an "AS IS" BASIS,
 | |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| See the License for the specific language governing permissions and
 | |
| limitations under the License.
 | |
| */
 | |
| 
 | |
| package proxy
 | |
| 
 | |
| import (
 | |
| 	"bufio"
 | |
| 	"bytes"
 | |
| 	"fmt"
 | |
| 	"io"
 | |
| 	"log"
 | |
| 	"net"
 | |
| 	"net/http"
 | |
| 	"net/http/httputil"
 | |
| 	"net/url"
 | |
| 	"os"
 | |
| 	"strings"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/mxk/go-flowrate/flowrate"
 | |
| 	apierrors "k8s.io/apimachinery/pkg/api/errors"
 | |
| 	"k8s.io/apimachinery/pkg/util/httpstream"
 | |
| 	utilnet "k8s.io/apimachinery/pkg/util/net"
 | |
| 	proxyutil "k8s.io/apimachinery/pkg/util/proxy"
 | |
| 	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
 | |
| 	"k8s.io/klog/v2"
 | |
| )
 | |
| 
 | |
| // UpgradeAwareHandler is an interface for dialing a backend for an upgrade request
 | |
| type UpgradeAwareHandler struct {
 | |
| 	proxyutil.UpgradeAwareHandler
 | |
| 
 | |
| 	UpgradeDialer *UpgradeDialer
 | |
| }
 | |
| 
 | |
| const defaultFlushInterval = 200 * time.Millisecond
 | |
| 
 | |
| // normalizeLocation returns the result of parsing the full URL, with scheme set to http if missing
 | |
| func normalizeLocation(location *url.URL) *url.URL {
 | |
| 	normalized, _ := url.Parse(location.String())
 | |
| 	if len(normalized.Scheme) == 0 {
 | |
| 		normalized.Scheme = "http"
 | |
| 	}
 | |
| 	return normalized
 | |
| }
 | |
| 
 | |
| // NewUpgradeAwareHandler creates a new proxy handler with a default flush interval. Responder is required for returning
 | |
| // errors to the caller.
 | |
| func NewUpgradeAwareHandler(location *url.URL, transport http.RoundTripper, wrapTransport, upgradeRequired bool, responder proxyutil.ErrorResponder) *UpgradeAwareHandler {
 | |
| 	return &UpgradeAwareHandler{
 | |
| 		UpgradeAwareHandler: proxyutil.UpgradeAwareHandler{
 | |
| 			Location:        normalizeLocation(location),
 | |
| 			Transport:       transport,
 | |
| 			WrapTransport:   wrapTransport,
 | |
| 			UpgradeRequired: upgradeRequired,
 | |
| 			FlushInterval:   defaultFlushInterval,
 | |
| 			Responder:       responder,
 | |
| 		},
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func proxyRedirectsforRootPath(path string, w http.ResponseWriter, req *http.Request) bool {
 | |
| 	redirect := false
 | |
| 	method := req.Method
 | |
| 
 | |
| 	// From pkg/genericapiserver/endpoints/handlers/proxy.go#ServeHTTP:
 | |
| 	// Redirect requests with an empty path to a location that ends with a '/'
 | |
| 	// This is essentially a hack for https://issue.k8s.io/4958.
 | |
| 	// Note: Keep this code after tryUpgrade to not break that flow.
 | |
| 	if len(path) == 0 && (method == http.MethodGet || method == http.MethodHead) {
 | |
| 		var queryPart string
 | |
| 		if len(req.URL.RawQuery) > 0 {
 | |
| 			queryPart = "?" + req.URL.RawQuery
 | |
| 		}
 | |
| 		w.Header().Set("Location", req.URL.Path+"/"+queryPart)
 | |
| 		w.WriteHeader(http.StatusMovedPermanently)
 | |
| 		redirect = true
 | |
| 	}
 | |
| 	return redirect
 | |
| }
 | |
| 
 | |
| // ServeHTTP handles the proxy request
 | |
| //
 | |
| //nolint:gocyclo
 | |
| func (h *UpgradeAwareHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
 | |
| 	if h.tryUpgrade(w, req) {
 | |
| 		return
 | |
| 	}
 | |
| 	if h.UpgradeRequired {
 | |
| 		h.Responder.Error(w, req, apierrors.NewBadRequest("Upgrade request required"))
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	loc := *h.Location
 | |
| 	loc.RawQuery = req.URL.RawQuery
 | |
| 
 | |
| 	// If original request URL ended in '/', append a '/' at the end of the
 | |
| 	// of the proxy URL
 | |
| 	if !strings.HasSuffix(loc.Path, "/") && strings.HasSuffix(req.URL.Path, "/") {
 | |
| 		loc.Path += "/"
 | |
| 	}
 | |
| 
 | |
| 	proxyRedirect := proxyRedirectsforRootPath(loc.Path, w, req)
 | |
| 	if proxyRedirect {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	if h.Transport == nil || h.WrapTransport {
 | |
| 		h.Transport = h.defaultProxyTransport(req.URL, h.Transport)
 | |
| 	}
 | |
| 
 | |
| 	// WithContext creates a shallow clone of the request with the same context.
 | |
| 	newReq := req.WithContext(req.Context())
 | |
| 	newReq.Header = utilnet.CloneHeader(req.Header)
 | |
| 	if !h.UseRequestLocation {
 | |
| 		newReq.URL = &loc
 | |
| 	}
 | |
| 	if h.UseLocationHost {
 | |
| 		// exchanging req.Host with the backend location is necessary for backends that act on the HTTP host header (e.g. API gateways),
 | |
| 		// because req.Host has preference over req.URL.Host in filling this header field
 | |
| 		newReq.Host = h.Location.Host
 | |
| 	}
 | |
| 
 | |
| 	// create the target location to use for the reverse proxy
 | |
| 	reverseProxyLocation := &url.URL{Scheme: h.Location.Scheme, Host: h.Location.Host}
 | |
| 	if h.AppendLocationPath {
 | |
| 		reverseProxyLocation.Path = h.Location.Path
 | |
| 	}
 | |
| 
 | |
| 	proxy := httputil.NewSingleHostReverseProxy(reverseProxyLocation)
 | |
| 	proxy.Transport = h.Transport
 | |
| 	proxy.FlushInterval = h.FlushInterval
 | |
| 	proxy.ErrorLog = log.New(noSuppressPanicError{}, "", log.LstdFlags)
 | |
| 	if h.RejectForwardingRedirects {
 | |
| 		oldModifyResponse := proxy.ModifyResponse
 | |
| 		proxy.ModifyResponse = func(response *http.Response) error {
 | |
| 			code := response.StatusCode
 | |
| 			if code >= 300 && code <= 399 && len(response.Header.Get("Location")) > 0 {
 | |
| 				// close the original response
 | |
| 				response.Body.Close()
 | |
| 				msg := "the backend attempted to redirect this request, which is not permitted"
 | |
| 				// replace the response
 | |
| 				*response = http.Response{
 | |
| 					StatusCode:    http.StatusBadGateway,
 | |
| 					Status:        fmt.Sprintf("%d %s", response.StatusCode, http.StatusText(response.StatusCode)),
 | |
| 					Body:          io.NopCloser(strings.NewReader(msg)),
 | |
| 					ContentLength: int64(len(msg)),
 | |
| 				}
 | |
| 			} else {
 | |
| 				if oldModifyResponse != nil {
 | |
| 					if err := oldModifyResponse(response); err != nil {
 | |
| 						return err
 | |
| 					}
 | |
| 				}
 | |
| 			}
 | |
| 			return nil
 | |
| 		}
 | |
| 	}
 | |
| 	if h.Responder != nil {
 | |
| 		// if an optional error interceptor/responder was provided wire it
 | |
| 		// the custom responder might be used for providing a unified error reporting
 | |
| 		// or supporting retry mechanisms by not sending non-fatal errors to the clients
 | |
| 		proxy.ErrorHandler = h.Responder.Error
 | |
| 	}
 | |
| 	proxy.ServeHTTP(w, newReq)
 | |
| }
 | |
| 
 | |
| type noSuppressPanicError struct{}
 | |
| 
 | |
| func (noSuppressPanicError) Write(p []byte) (n int, err error) {
 | |
| 	// skip "suppressing panic for copyResponse error in test; copy error" error message
 | |
| 	// that ends up in CI tests on each kube-apiserver termination as noise and
 | |
| 	// everybody thinks this is fatal.
 | |
| 	if strings.Contains(string(p), "suppressing panic") {
 | |
| 		return len(p), nil
 | |
| 	}
 | |
| 	return os.Stderr.Write(p)
 | |
| }
 | |
| 
 | |
| // tryUpgrade returns true if the request was handled.
 | |
| //
 | |
| //nolint:gocyclo
 | |
| func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Request) bool {
 | |
| 	if !httpstream.IsUpgradeRequest(req) {
 | |
| 		klog.V(6).Infof("Request was not an upgrade")
 | |
| 		return false
 | |
| 	}
 | |
| 
 | |
| 	var (
 | |
| 		backendConn net.Conn
 | |
| 		rawResponse []byte
 | |
| 		err         error
 | |
| 	)
 | |
| 
 | |
| 	location := *h.Location
 | |
| 	if h.UseRequestLocation {
 | |
| 		location = *req.URL
 | |
| 		location.Scheme = h.Location.Scheme
 | |
| 		location.Host = h.Location.Host
 | |
| 		if h.AppendLocationPath {
 | |
| 			location.Path = singleJoiningSlash(h.Location.Path, location.Path)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	clone := utilnet.CloneRequest(req)
 | |
| 	// Only append X-Forwarded-For in the upgrade path, since httputil.NewSingleHostReverseProxy
 | |
| 	// handles this in the non-upgrade path.
 | |
| 	utilnet.AppendForwardedForHeader(clone)
 | |
| 	klog.V(6).Infof("Connecting to backend proxy (direct dial) %s\n  Headers: %v", &location, clone.Header)
 | |
| 	if h.UseLocationHost {
 | |
| 		clone.Host = h.Location.Host
 | |
| 	}
 | |
| 	clone.URL = &location
 | |
| 
 | |
| 	//backendConn, err = h.DialForUpgrade(clone)
 | |
| 	backendConn, err = h.UpgradeDialer.Dial(clone)
 | |
| 	if err != nil {
 | |
| 		klog.V(6).Infof("Proxy connection error: %v", err)
 | |
| 		h.Responder.Error(w, req, err)
 | |
| 		return true
 | |
| 	}
 | |
| 	defer backendConn.Close()
 | |
| 
 | |
| 	// determine the http response code from the backend by reading from rawResponse+backendConn
 | |
| 	backendHTTPResponse, headerBytes, err := getResponse(io.MultiReader(bytes.NewReader(rawResponse), backendConn))
 | |
| 	if err != nil {
 | |
| 		klog.V(6).Infof("Proxy connection error: %v", err)
 | |
| 		h.Responder.Error(w, req, err)
 | |
| 		return true
 | |
| 	}
 | |
| 	if len(headerBytes) > len(rawResponse) {
 | |
| 		// we read beyond the bytes stored in rawResponse, update rawResponse to the full set of bytes read from the backend
 | |
| 		rawResponse = headerBytes
 | |
| 	}
 | |
| 
 | |
| 	// If the backend did not upgrade the request, return an error to the client. If the response was
 | |
| 	// an error, the error is forwarded directly after the connection is hijacked. Otherwise, just
 | |
| 	// return a generic error here.
 | |
| 	if backendHTTPResponse.StatusCode != http.StatusSwitchingProtocols && backendHTTPResponse.StatusCode < 400 {
 | |
| 		err := fmt.Errorf("invalid upgrade response: status code %d", backendHTTPResponse.StatusCode)
 | |
| 		klog.Errorf("Proxy upgrade error: %v", err)
 | |
| 		h.Responder.Error(w, req, err)
 | |
| 		return true
 | |
| 	}
 | |
| 
 | |
| 	// Once the connection is hijacked, the ErrorResponder will no longer work, so
 | |
| 	// hijacking should be the last step in the upgrade.
 | |
| 	requestHijacker, ok := w.(http.Hijacker)
 | |
| 	if !ok {
 | |
| 		klog.V(6).Infof("Unable to hijack response writer: %T", w)
 | |
| 		h.Responder.Error(w, req, fmt.Errorf("request connection cannot be hijacked: %T", w))
 | |
| 		return true
 | |
| 	}
 | |
| 	requestHijackedConn, _, err := requestHijacker.Hijack()
 | |
| 	if err != nil {
 | |
| 		klog.V(6).Infof("Unable to hijack response: %v", err)
 | |
| 		h.Responder.Error(w, req, fmt.Errorf("error hijacking connection: %v", err))
 | |
| 		return true
 | |
| 	}
 | |
| 	defer requestHijackedConn.Close()
 | |
| 
 | |
| 	if backendHTTPResponse.StatusCode != http.StatusSwitchingProtocols {
 | |
| 		// If the backend did not upgrade the request, echo the response from the backend to the client and return, closing the connection.
 | |
| 		klog.V(6).Infof("Proxy upgrade error, status code %d", backendHTTPResponse.StatusCode)
 | |
| 		// set read/write deadlines
 | |
| 		deadline := time.Now().Add(10 * time.Second)
 | |
| 		_ = backendConn.SetReadDeadline(deadline)
 | |
| 		_ = requestHijackedConn.SetWriteDeadline(deadline)
 | |
| 		// write the response to the client
 | |
| 		err := backendHTTPResponse.Write(requestHijackedConn)
 | |
| 		if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
 | |
| 			klog.Errorf("Error proxying data from backend to client: %v", err)
 | |
| 		}
 | |
| 		// Indicate we handled the request
 | |
| 		return true
 | |
| 	}
 | |
| 
 | |
| 	// Forward raw response bytes back to client.
 | |
| 	if len(rawResponse) > 0 {
 | |
| 		klog.V(6).Infof("Writing %d bytes to hijacked connection", len(rawResponse))
 | |
| 		if _, err = requestHijackedConn.Write(rawResponse); err != nil {
 | |
| 			utilruntime.HandleError(fmt.Errorf("error proxying response from backend to client: %v", err))
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Proxy the connection. This is bidirectional, so we need a goroutine
 | |
| 	// to copy in each direction. Once one side of the connection exits, we
 | |
| 	// exit the function which performs cleanup and in the process closes
 | |
| 	// the other half of the connection in the defer.
 | |
| 	writerComplete := make(chan struct{})
 | |
| 	readerComplete := make(chan struct{})
 | |
| 
 | |
| 	go func() {
 | |
| 		var writer io.WriteCloser
 | |
| 		if h.MaxBytesPerSec > 0 {
 | |
| 			writer = flowrate.NewWriter(backendConn, h.MaxBytesPerSec)
 | |
| 		} else {
 | |
| 			writer = backendConn
 | |
| 		}
 | |
| 		_, err := io.Copy(writer, requestHijackedConn)
 | |
| 		if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
 | |
| 			klog.Errorf("Error proxying data from client to backend: %v", err)
 | |
| 		}
 | |
| 		close(writerComplete)
 | |
| 	}()
 | |
| 
 | |
| 	go func() {
 | |
| 		var reader io.ReadCloser
 | |
| 		if h.MaxBytesPerSec > 0 {
 | |
| 			reader = flowrate.NewReader(backendConn, h.MaxBytesPerSec)
 | |
| 		} else {
 | |
| 			reader = backendConn
 | |
| 		}
 | |
| 		_, err := io.Copy(requestHijackedConn, reader)
 | |
| 		if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
 | |
| 			klog.Errorf("Error proxying data from backend to client: %v", err)
 | |
| 		}
 | |
| 		close(readerComplete)
 | |
| 	}()
 | |
| 
 | |
| 	// Wait for one half the connection to exit. Once it does the defer will
 | |
| 	// clean up the other half of the connection.
 | |
| 	select {
 | |
| 	case <-writerComplete:
 | |
| 	case <-readerComplete:
 | |
| 	}
 | |
| 	klog.V(6).Infof("Disconnecting from backend proxy %s\n  Headers: %v", &location, clone.Header)
 | |
| 
 | |
| 	return true
 | |
| }
 | |
| 
 | |
| // FIXME: Taken from net/http/httputil/reverseproxy.go as singleJoiningSlash is not exported to be re-used.
 | |
| // See-also: https://github.com/golang/go/issues/44290
 | |
| func singleJoiningSlash(a, b string) string {
 | |
| 	aslash := strings.HasSuffix(a, "/")
 | |
| 	bslash := strings.HasPrefix(b, "/")
 | |
| 	switch {
 | |
| 	case aslash && bslash:
 | |
| 		return a + b[1:]
 | |
| 	case !aslash && !bslash:
 | |
| 		return a + "/" + b
 | |
| 	}
 | |
| 	return a + b
 | |
| }
 | |
| 
 | |
| // getResponseCode reads a http response from the given reader, returns the response,
 | |
| // the bytes read from the reader, and any error encountered
 | |
| func getResponse(r io.Reader) (*http.Response, []byte, error) {
 | |
| 	rawResponse := bytes.NewBuffer(make([]byte, 0, 256))
 | |
| 	// Save the bytes read while reading the response headers into the rawResponse buffer
 | |
| 	resp, err := http.ReadResponse(bufio.NewReader(io.TeeReader(r, rawResponse)), nil)
 | |
| 	if err != nil {
 | |
| 		return nil, nil, err
 | |
| 	}
 | |
| 	// return the http response and the raw bytes consumed from the reader in the process
 | |
| 	return resp, rawResponse.Bytes(), nil
 | |
| }
 | |
| 
 | |
| func (h *UpgradeAwareHandler) defaultProxyTransport(url *url.URL, internalTransport http.RoundTripper) http.RoundTripper {
 | |
| 	scheme := url.Scheme
 | |
| 	host := url.Host
 | |
| 	suffix := h.Location.Path
 | |
| 	if strings.HasSuffix(url.Path, "/") && !strings.HasSuffix(suffix, "/") {
 | |
| 		suffix += "/"
 | |
| 	}
 | |
| 	pathPrepend := strings.TrimSuffix(url.Path, suffix)
 | |
| 	rewritingTransport := &proxyutil.Transport{
 | |
| 		Scheme:       scheme,
 | |
| 		Host:         host,
 | |
| 		PathPrepend:  pathPrepend,
 | |
| 		RoundTripper: internalTransport,
 | |
| 	}
 | |
| 	return &corsRemovingTransport{
 | |
| 		RoundTripper: rewritingTransport,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // corsRemovingTransport is a wrapper for an internal transport. It removes CORS headers
 | |
| // from the internal response.
 | |
| // Implements pkg/util/net.RoundTripperWrapper
 | |
| type corsRemovingTransport struct {
 | |
| 	http.RoundTripper
 | |
| }
 | |
| 
 | |
| var _ = utilnet.RoundTripperWrapper(&corsRemovingTransport{})
 | |
| 
 | |
| func (rt *corsRemovingTransport) RoundTrip(req *http.Request) (*http.Response, error) {
 | |
| 	resp, err := rt.RoundTripper.RoundTrip(req)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	removeCORSHeaders(resp)
 | |
| 	return resp, nil
 | |
| }
 | |
| 
 | |
| func (rt *corsRemovingTransport) WrappedRoundTripper() http.RoundTripper {
 | |
| 	return rt.RoundTripper
 | |
| }
 | |
| 
 | |
| // removeCORSHeaders strip CORS headers sent from the backend
 | |
| // This should be called on all responses before returning
 | |
| func removeCORSHeaders(resp *http.Response) {
 | |
| 	resp.Header.Del("Access-Control-Allow-Credentials")
 | |
| 	resp.Header.Del("Access-Control-Allow-Headers")
 | |
| 	resp.Header.Del("Access-Control-Allow-Methods")
 | |
| 	resp.Header.Del("Access-Control-Allow-Origin")
 | |
| }
 |