Unify generic proxy code in apimachinery

Make the utility package truly generic, consolidate all generic proxy in
apimachinery.

Kubernetes-commit: edc12aafe2fbfe3475bdf03c34ffa26cc4322673
This commit is contained in:
Clayton Coleman 2017-07-07 16:11:34 -04:00 committed by Kubernetes Publisher
parent c521c8f6b5
commit bcc0d0a6d0
9 changed files with 1 additions and 1957 deletions

View File

@ -33,12 +33,12 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apimachinery/pkg/util/net"
proxyutil "k8s.io/apimachinery/pkg/util/proxy"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"k8s.io/apiserver/pkg/endpoints/metrics"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/apiserver/pkg/server/httplog"
proxyutil "k8s.io/apiserver/pkg/util/proxy"
"github.com/golang/glog"
)

View File

@ -1,281 +0,0 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rest
import (
"context"
"fmt"
"io"
"net"
"net/http"
"net/http/httputil"
"net/url"
"strings"
"sync"
"time"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/httpstream"
utilnet "k8s.io/apimachinery/pkg/util/net"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
genericfeatures "k8s.io/apiserver/pkg/features"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/apiserver/pkg/util/proxy"
"github.com/golang/glog"
"github.com/mxk/go-flowrate/flowrate"
)
// UpgradeAwareProxyHandler is a handler for proxy requests that may require an upgrade
type UpgradeAwareProxyHandler struct {
UpgradeRequired bool
Location *url.URL
// Transport provides an optional round tripper to use to proxy. If nil, the default proxy transport is used
Transport http.RoundTripper
// WrapTransport indicates whether the provided Transport should be wrapped with default proxy transport behavior (URL rewriting, X-Forwarded-* header setting)
WrapTransport bool
// InterceptRedirects determines whether the proxy should sniff backend responses for redirects,
// following them as necessary.
InterceptRedirects bool
FlushInterval time.Duration
MaxBytesPerSec int64
Responder ErrorResponder
}
const defaultFlushInterval = 200 * time.Millisecond
// ErrorResponder abstracts error reporting to the proxy handler to remove the need to hardcode a particular
// error format.
type ErrorResponder interface {
Error(err error)
}
// NewUpgradeAwareProxyHandler creates a new proxy handler with a default flush interval. Responder is required for returning
// errors to the caller.
func NewUpgradeAwareProxyHandler(location *url.URL, transport http.RoundTripper, wrapTransport, upgradeRequired bool, responder ErrorResponder) *UpgradeAwareProxyHandler {
return &UpgradeAwareProxyHandler{
Location: location,
Transport: transport,
WrapTransport: wrapTransport,
UpgradeRequired: upgradeRequired,
FlushInterval: defaultFlushInterval,
Responder: responder,
}
}
// ServeHTTP handles the proxy request
func (h *UpgradeAwareProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if len(h.Location.Scheme) == 0 {
h.Location.Scheme = "http"
}
if h.tryUpgrade(w, req) {
return
}
if h.UpgradeRequired {
h.Responder.Error(errors.NewBadRequest("Upgrade request required"))
return
}
loc := *h.Location
loc.RawQuery = req.URL.RawQuery
// If original request URL ended in '/', append a '/' at the end of the
// of the proxy URL
if !strings.HasSuffix(loc.Path, "/") && strings.HasSuffix(req.URL.Path, "/") {
loc.Path += "/"
}
// From pkg/genericapiserver/endpoints/handlers/proxy.go#ServeHTTP:
// Redirect requests with an empty path to a location that ends with a '/'
// This is essentially a hack for http://issue.k8s.io/4958.
// Note: Keep this code after tryUpgrade to not break that flow.
if len(loc.Path) == 0 {
var queryPart string
if len(req.URL.RawQuery) > 0 {
queryPart = "?" + req.URL.RawQuery
}
w.Header().Set("Location", req.URL.Path+"/"+queryPart)
w.WriteHeader(http.StatusMovedPermanently)
return
}
if h.Transport == nil || h.WrapTransport {
h.Transport = h.defaultProxyTransport(req.URL, h.Transport)
}
// WithContext creates a shallow clone of the request with the new context.
newReq := req.WithContext(context.Background())
newReq.Header = utilnet.CloneHeader(req.Header)
newReq.URL = &loc
proxy := httputil.NewSingleHostReverseProxy(&url.URL{Scheme: h.Location.Scheme, Host: h.Location.Host})
proxy.Transport = h.Transport
proxy.FlushInterval = h.FlushInterval
proxy.ServeHTTP(w, newReq)
}
// tryUpgrade returns true if the request was handled.
func (h *UpgradeAwareProxyHandler) tryUpgrade(w http.ResponseWriter, req *http.Request) bool {
if !httpstream.IsUpgradeRequest(req) {
return false
}
var (
backendConn net.Conn
rawResponse []byte
err error
)
clone := utilnet.CloneRequest(req)
// Only append X-Forwarded-For in the upgrade path, since httputil.NewSingleHostReverseProxy
// handles this in the non-upgrade path.
utilnet.AppendForwardedForHeader(clone)
if h.InterceptRedirects && utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StreamingProxyRedirects) {
backendConn, rawResponse, err = utilnet.ConnectWithRedirects(req.Method, h.Location, clone.Header, req.Body, h)
} else {
clone.URL = h.Location
backendConn, err = h.Dial(clone)
}
if err != nil {
h.Responder.Error(err)
return true
}
defer backendConn.Close()
// Once the connection is hijacked, the ErrorResponder will no longer work, so
// hijacking should be the last step in the upgrade.
requestHijacker, ok := w.(http.Hijacker)
if !ok {
h.Responder.Error(fmt.Errorf("request connection cannot be hijacked: %T", w))
return true
}
requestHijackedConn, _, err := requestHijacker.Hijack()
if err != nil {
h.Responder.Error(fmt.Errorf("error hijacking request connection: %v", err))
return true
}
defer requestHijackedConn.Close()
// Forward raw response bytes back to client.
if len(rawResponse) > 0 {
if _, err = requestHijackedConn.Write(rawResponse); err != nil {
utilruntime.HandleError(fmt.Errorf("Error proxying response from backend to client: %v", err))
}
}
// Proxy the connection.
wg := &sync.WaitGroup{}
wg.Add(2)
go func() {
var writer io.WriteCloser
if h.MaxBytesPerSec > 0 {
writer = flowrate.NewWriter(backendConn, h.MaxBytesPerSec)
} else {
writer = backendConn
}
_, err := io.Copy(writer, requestHijackedConn)
if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
glog.Errorf("Error proxying data from client to backend: %v", err)
}
wg.Done()
}()
go func() {
var reader io.ReadCloser
if h.MaxBytesPerSec > 0 {
reader = flowrate.NewReader(backendConn, h.MaxBytesPerSec)
} else {
reader = backendConn
}
_, err := io.Copy(requestHijackedConn, reader)
if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
glog.Errorf("Error proxying data from backend to client: %v", err)
}
wg.Done()
}()
wg.Wait()
return true
}
// Dial dials the backend at req.URL and writes req to it.
func (h *UpgradeAwareProxyHandler) Dial(req *http.Request) (net.Conn, error) {
conn, err := proxy.DialURL(req.URL, h.Transport)
if err != nil {
return nil, fmt.Errorf("error dialing backend: %v", err)
}
if err = req.Write(conn); err != nil {
conn.Close()
return nil, fmt.Errorf("error sending request: %v", err)
}
return conn, err
}
var _ utilnet.Dialer = &UpgradeAwareProxyHandler{}
func (h *UpgradeAwareProxyHandler) defaultProxyTransport(url *url.URL, internalTransport http.RoundTripper) http.RoundTripper {
scheme := url.Scheme
host := url.Host
suffix := h.Location.Path
if strings.HasSuffix(url.Path, "/") && !strings.HasSuffix(suffix, "/") {
suffix += "/"
}
pathPrepend := strings.TrimSuffix(url.Path, suffix)
rewritingTransport := &proxy.Transport{
Scheme: scheme,
Host: host,
PathPrepend: pathPrepend,
RoundTripper: internalTransport,
}
return &corsRemovingTransport{
RoundTripper: rewritingTransport,
}
}
// corsRemovingTransport is a wrapper for an internal transport. It removes CORS headers
// from the internal response.
// Implements pkg/util/net.RoundTripperWrapper
type corsRemovingTransport struct {
http.RoundTripper
}
var _ = utilnet.RoundTripperWrapper(&corsRemovingTransport{})
func (rt *corsRemovingTransport) RoundTrip(req *http.Request) (*http.Response, error) {
resp, err := rt.RoundTripper.RoundTrip(req)
if err != nil {
return nil, err
}
removeCORSHeaders(resp)
return resp, nil
}
func (rt *corsRemovingTransport) WrappedRoundTripper() http.RoundTripper {
return rt.RoundTripper
}
// removeCORSHeaders strip CORS headers sent from the backend
// This should be called on all responses before returning
func removeCORSHeaders(resp *http.Response) {
resp.Header.Del("Access-Control-Allow-Credentials")
resp.Header.Del("Access-Control-Allow-Headers")
resp.Header.Del("Access-Control-Allow-Methods")
resp.Header.Del("Access-Control-Allow-Origin")
}

View File

@ -1,826 +0,0 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rest
import (
"bytes"
"compress/gzip"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"net/http/httptest"
"net/http/httputil"
"net/url"
"reflect"
"strconv"
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/net/websocket"
"k8s.io/apimachinery/pkg/util/httpstream"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apiserver/pkg/features"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/apiserver/pkg/util/proxy"
)
const fakeStatusCode = 567
type fakeResponder struct {
t *testing.T
called bool
err error
// called chan error
w http.ResponseWriter
}
func (r *fakeResponder) Error(err error) {
if r.called {
r.t.Errorf("Error responder called again!\nprevious error: %v\nnew error: %v", r.err, err)
}
if r.w != nil {
r.w.WriteHeader(fakeStatusCode)
_, writeErr := r.w.Write([]byte(err.Error()))
assert.NoError(r.t, writeErr)
} else {
r.t.Logf("No ResponseWriter set")
}
r.called = true
r.err = err
}
type fakeConn struct {
err error // The error to return when io is performed over the connection.
}
func (f *fakeConn) Read([]byte) (int, error) { return 0, f.err }
func (f *fakeConn) Write([]byte) (int, error) { return 0, f.err }
func (f *fakeConn) Close() error { return nil }
func (fakeConn) LocalAddr() net.Addr { return nil }
func (fakeConn) RemoteAddr() net.Addr { return nil }
func (fakeConn) SetDeadline(t time.Time) error { return nil }
func (fakeConn) SetReadDeadline(t time.Time) error { return nil }
func (fakeConn) SetWriteDeadline(t time.Time) error { return nil }
type SimpleBackendHandler struct {
requestURL url.URL
requestHeader http.Header
requestBody []byte
requestMethod string
responseBody string
responseHeader map[string]string
t *testing.T
}
func (s *SimpleBackendHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
s.requestURL = *req.URL
s.requestHeader = req.Header
s.requestMethod = req.Method
var err error
s.requestBody, err = ioutil.ReadAll(req.Body)
if err != nil {
s.t.Errorf("Unexpected error: %v", err)
return
}
if s.responseHeader != nil {
for k, v := range s.responseHeader {
w.Header().Add(k, v)
}
}
w.Write([]byte(s.responseBody))
}
func validateParameters(t *testing.T, name string, actual url.Values, expected map[string]string) {
for k, v := range expected {
actualValue, ok := actual[k]
if !ok {
t.Errorf("%s: Expected parameter %s not received", name, k)
continue
}
if actualValue[0] != v {
t.Errorf("%s: Parameter %s values don't match. Actual: %#v, Expected: %s",
name, k, actualValue, v)
}
}
}
func validateHeaders(t *testing.T, name string, actual http.Header, expected map[string]string, notExpected []string) {
for k, v := range expected {
actualValue, ok := actual[k]
if !ok {
t.Errorf("%s: Expected header %s not received", name, k)
continue
}
if actualValue[0] != v {
t.Errorf("%s: Header %s values don't match. Actual: %s, Expected: %s",
name, k, actualValue, v)
}
}
if notExpected == nil {
return
}
for _, h := range notExpected {
if _, present := actual[h]; present {
t.Errorf("%s: unexpected header: %s", name, h)
}
}
}
func TestServeHTTP(t *testing.T) {
tests := []struct {
name string
method string
requestPath string
expectedPath string
requestBody string
requestParams map[string]string
requestHeader map[string]string
responseHeader map[string]string
expectedRespHeader map[string]string
notExpectedRespHeader []string
upgradeRequired bool
expectError func(err error) bool
}{
{
name: "root path, simple get",
method: "GET",
requestPath: "/",
expectedPath: "/",
},
{
name: "no upgrade header sent",
method: "GET",
requestPath: "/",
upgradeRequired: true,
expectError: func(err error) bool {
return err != nil && strings.Contains(err.Error(), "Upgrade request required")
},
},
{
name: "simple path, get",
method: "GET",
requestPath: "/path/to/test",
expectedPath: "/path/to/test",
},
{
name: "request params",
method: "POST",
requestPath: "/some/path/",
expectedPath: "/some/path/",
requestParams: map[string]string{"param1": "value/1", "param2": "value%2"},
requestBody: "test request body",
},
{
name: "request headers",
method: "PUT",
requestPath: "/some/path",
expectedPath: "/some/path",
requestHeader: map[string]string{"Header1": "value1", "Header2": "value2"},
},
{
name: "empty path - slash should be added",
method: "GET",
requestPath: "",
expectedPath: "/",
},
{
name: "remove CORS headers",
method: "GET",
requestPath: "/some/path",
expectedPath: "/some/path",
responseHeader: map[string]string{
"Header1": "value1",
"Access-Control-Allow-Origin": "some.server",
"Access-Control-Allow-Methods": "GET"},
expectedRespHeader: map[string]string{
"Header1": "value1",
},
notExpectedRespHeader: []string{
"Access-Control-Allow-Origin",
"Access-Control-Allow-Methods",
},
},
}
for i, test := range tests {
func() {
backendResponse := "<html><head></head><body><a href=\"/test/path\">Hello</a></body></html>"
backendResponseHeader := test.responseHeader
// Test a simple header if not specified in the test
if backendResponseHeader == nil && test.expectedRespHeader == nil {
backendResponseHeader = map[string]string{"Content-Type": "text/html"}
test.expectedRespHeader = map[string]string{"Content-Type": "text/html"}
}
backendHandler := &SimpleBackendHandler{
responseBody: backendResponse,
responseHeader: backendResponseHeader,
}
backendServer := httptest.NewServer(backendHandler)
defer backendServer.Close()
responder := &fakeResponder{t: t}
backendURL, _ := url.Parse(backendServer.URL)
backendURL.Path = test.requestPath
proxyHandler := &UpgradeAwareProxyHandler{
Location: backendURL,
Responder: responder,
UpgradeRequired: test.upgradeRequired,
}
proxyServer := httptest.NewServer(proxyHandler)
defer proxyServer.Close()
proxyURL, _ := url.Parse(proxyServer.URL)
proxyURL.Path = test.requestPath
paramValues := url.Values{}
for k, v := range test.requestParams {
paramValues[k] = []string{v}
}
proxyURL.RawQuery = paramValues.Encode()
var requestBody io.Reader
if test.requestBody != "" {
requestBody = bytes.NewBufferString(test.requestBody)
}
req, err := http.NewRequest(test.method, proxyURL.String(), requestBody)
if test.requestHeader != nil {
header := http.Header{}
for k, v := range test.requestHeader {
header.Add(k, v)
}
req.Header = header
}
if err != nil {
t.Errorf("Error creating client request: %v", err)
}
client := &http.Client{}
res, err := client.Do(req)
if err != nil {
t.Errorf("Error from proxy request: %v", err)
}
if test.expectError != nil {
if !responder.called {
t.Errorf("%d: responder was not invoked", i)
return
}
if !test.expectError(responder.err) {
t.Errorf("%d: unexpected error: %v", i, responder.err)
}
return
}
// Validate backend request
// Method
if backendHandler.requestMethod != test.method {
t.Errorf("Unexpected request method: %s. Expected: %s",
backendHandler.requestMethod, test.method)
}
// Body
if string(backendHandler.requestBody) != test.requestBody {
t.Errorf("Unexpected request body: %s. Expected: %s",
string(backendHandler.requestBody), test.requestBody)
}
// Path
if backendHandler.requestURL.Path != test.expectedPath {
t.Errorf("Unexpected request path: %s", backendHandler.requestURL.Path)
}
// Parameters
validateParameters(t, test.name, backendHandler.requestURL.Query(), test.requestParams)
// Headers
validateHeaders(t, test.name+" backend request", backendHandler.requestHeader,
test.requestHeader, nil)
// Validate proxy response
// Response Headers
validateHeaders(t, test.name+" backend headers", res.Header, test.expectedRespHeader, test.notExpectedRespHeader)
// Validate Body
responseBody, err := ioutil.ReadAll(res.Body)
if err != nil {
t.Errorf("Unexpected error reading response body: %v", err)
}
if rb := string(responseBody); rb != backendResponse {
t.Errorf("Did not get expected response body: %s. Expected: %s", rb, backendResponse)
}
// Error
if responder.called {
t.Errorf("Unexpected proxy handler error: %v", responder.err)
}
}()
}
}
func TestProxyUpgrade(t *testing.T) {
localhostPool := x509.NewCertPool()
if !localhostPool.AppendCertsFromPEM(localhostCert) {
t.Errorf("error setting up localhostCert pool")
}
testcases := map[string]struct {
ServerFunc func(http.Handler) *httptest.Server
ProxyTransport http.RoundTripper
}{
"http": {
ServerFunc: httptest.NewServer,
ProxyTransport: nil,
},
"https (invalid hostname + InsecureSkipVerify)": {
ServerFunc: func(h http.Handler) *httptest.Server {
cert, err := tls.X509KeyPair(exampleCert, exampleKey)
if err != nil {
t.Errorf("https (invalid hostname): proxy_test: %v", err)
}
ts := httptest.NewUnstartedServer(h)
ts.TLS = &tls.Config{
Certificates: []tls.Certificate{cert},
}
ts.StartTLS()
return ts
},
ProxyTransport: utilnet.SetTransportDefaults(&http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}),
},
"https (valid hostname + RootCAs)": {
ServerFunc: func(h http.Handler) *httptest.Server {
cert, err := tls.X509KeyPair(localhostCert, localhostKey)
if err != nil {
t.Errorf("https (valid hostname): proxy_test: %v", err)
}
ts := httptest.NewUnstartedServer(h)
ts.TLS = &tls.Config{
Certificates: []tls.Certificate{cert},
}
ts.StartTLS()
return ts
},
ProxyTransport: utilnet.SetTransportDefaults(&http.Transport{TLSClientConfig: &tls.Config{RootCAs: localhostPool}}),
},
"https (valid hostname + RootCAs + custom dialer)": {
ServerFunc: func(h http.Handler) *httptest.Server {
cert, err := tls.X509KeyPair(localhostCert, localhostKey)
if err != nil {
t.Errorf("https (valid hostname): proxy_test: %v", err)
}
ts := httptest.NewUnstartedServer(h)
ts.TLS = &tls.Config{
Certificates: []tls.Certificate{cert},
}
ts.StartTLS()
return ts
},
ProxyTransport: utilnet.SetTransportDefaults(&http.Transport{Dial: net.Dial, TLSClientConfig: &tls.Config{RootCAs: localhostPool}}),
},
}
// Enable StreamingProxyRedirects for test.
utilfeature.DefaultFeatureGate.Set(string(features.StreamingProxyRedirects) + "=true")
for k, tc := range testcases {
for _, redirect := range []bool{false, true} {
tcName := k
backendPath := "/hello"
if redirect {
tcName += " with redirect"
backendPath = "/redirect"
}
func() { // Cleanup after each test case.
backend := http.NewServeMux()
backend.Handle("/hello", websocket.Handler(func(ws *websocket.Conn) {
defer ws.Close()
body := make([]byte, 5)
ws.Read(body)
ws.Write([]byte("hello " + string(body)))
}))
backend.Handle("/redirect", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
http.Redirect(w, r, "/hello", http.StatusFound)
}))
backendServer := tc.ServerFunc(backend)
defer backendServer.Close()
serverURL, _ := url.Parse(backendServer.URL)
serverURL.Path = backendPath
proxyHandler := &UpgradeAwareProxyHandler{
Location: serverURL,
Transport: tc.ProxyTransport,
InterceptRedirects: redirect,
Responder: &noErrorsAllowed{t: t},
}
proxy := httptest.NewServer(proxyHandler)
defer proxy.Close()
ws, err := websocket.Dial("ws://"+proxy.Listener.Addr().String()+"/some/path", "", "http://127.0.0.1/")
if err != nil {
t.Fatalf("%s: websocket dial err: %s", tcName, err)
}
defer ws.Close()
if _, err := ws.Write([]byte("world")); err != nil {
t.Fatalf("%s: write err: %s", tcName, err)
}
response := make([]byte, 20)
n, err := ws.Read(response)
if err != nil {
t.Fatalf("%s: read err: %s", tcName, err)
}
if e, a := "hello world", string(response[0:n]); e != a {
t.Fatalf("%s: expected '%#v', got '%#v'", tcName, e, a)
}
}()
}
}
}
type noErrorsAllowed struct {
t *testing.T
}
func (r *noErrorsAllowed) Error(err error) {
r.t.Error(err)
}
func TestProxyUpgradeErrorResponse(t *testing.T) {
var (
responder *fakeResponder
expectedErr = errors.New("EXPECTED")
)
proxy := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
transport := http.DefaultTransport.(*http.Transport)
transport.Dial = func(network, addr string) (net.Conn, error) {
return &fakeConn{err: expectedErr}, nil
}
responder = &fakeResponder{t: t, w: w}
proxyHandler := &UpgradeAwareProxyHandler{
Location: &url.URL{
Host: "fake-backend",
},
UpgradeRequired: true,
Responder: responder,
Transport: transport,
}
proxyHandler.ServeHTTP(w, r)
}))
defer proxy.Close()
// Send request to proxy server.
req, err := http.NewRequest("POST", "http://"+proxy.Listener.Addr().String()+"/some/path", nil)
require.NoError(t, err)
req.Header.Set(httpstream.HeaderConnection, httpstream.HeaderUpgrade)
resp, err := http.DefaultClient.Do(req)
require.NoError(t, err)
defer resp.Body.Close()
// Expect error response.
assert.True(t, responder.called)
assert.Equal(t, fakeStatusCode, resp.StatusCode)
msg, err := ioutil.ReadAll(resp.Body)
require.NoError(t, err)
assert.Contains(t, string(msg), expectedErr.Error())
}
func TestDefaultProxyTransport(t *testing.T) {
tests := []struct {
name,
url,
location,
expectedScheme,
expectedHost,
expectedPathPrepend string
}{
{
name: "simple path",
url: "http://test.server:8080/a/test/location",
location: "http://localhost/location",
expectedScheme: "http",
expectedHost: "test.server:8080",
expectedPathPrepend: "/a/test",
},
{
name: "empty path",
url: "http://test.server:8080/a/test/",
location: "http://localhost",
expectedScheme: "http",
expectedHost: "test.server:8080",
expectedPathPrepend: "/a/test",
},
{
name: "location ending in slash",
url: "http://test.server:8080/a/test/",
location: "http://localhost/",
expectedScheme: "http",
expectedHost: "test.server:8080",
expectedPathPrepend: "/a/test",
},
}
for _, test := range tests {
locURL, _ := url.Parse(test.location)
URL, _ := url.Parse(test.url)
h := UpgradeAwareProxyHandler{
Location: locURL,
}
result := h.defaultProxyTransport(URL, nil)
transport := result.(*corsRemovingTransport).RoundTripper.(*proxy.Transport)
if transport.Scheme != test.expectedScheme {
t.Errorf("%s: unexpected scheme. Actual: %s, Expected: %s", test.name, transport.Scheme, test.expectedScheme)
}
if transport.Host != test.expectedHost {
t.Errorf("%s: unexpected host. Actual: %s, Expected: %s", test.name, transport.Host, test.expectedHost)
}
if transport.PathPrepend != test.expectedPathPrepend {
t.Errorf("%s: unexpected path prepend. Actual: %s, Expected: %s", test.name, transport.PathPrepend, test.expectedPathPrepend)
}
}
}
func TestProxyRequestContentLengthAndTransferEncoding(t *testing.T) {
chunk := func(data []byte) []byte {
out := &bytes.Buffer{}
chunker := httputil.NewChunkedWriter(out)
for _, b := range data {
if _, err := chunker.Write([]byte{b}); err != nil {
panic(err)
}
}
chunker.Close()
out.Write([]byte("\r\n"))
return out.Bytes()
}
zip := func(data []byte) []byte {
out := &bytes.Buffer{}
zipper := gzip.NewWriter(out)
if _, err := zipper.Write(data); err != nil {
panic(err)
}
zipper.Close()
return out.Bytes()
}
sampleData := []byte("abcde")
table := map[string]struct {
reqHeaders http.Header
reqBody []byte
expectedHeaders http.Header
expectedBody []byte
}{
"content-length": {
reqHeaders: http.Header{
"Content-Length": []string{"5"},
},
reqBody: sampleData,
expectedHeaders: http.Header{
"Content-Length": []string{"5"},
"Content-Encoding": nil, // none set
"Transfer-Encoding": nil, // none set
},
expectedBody: sampleData,
},
"content-length + identity transfer-encoding": {
reqHeaders: http.Header{
"Content-Length": []string{"5"},
"Transfer-Encoding": []string{"identity"},
},
reqBody: sampleData,
expectedHeaders: http.Header{
"Content-Length": []string{"5"},
"Content-Encoding": nil, // none set
"Transfer-Encoding": nil, // gets removed
},
expectedBody: sampleData,
},
"content-length + gzip content-encoding": {
reqHeaders: http.Header{
"Content-Length": []string{strconv.Itoa(len(zip(sampleData)))},
"Content-Encoding": []string{"gzip"},
},
reqBody: zip(sampleData),
expectedHeaders: http.Header{
"Content-Length": []string{strconv.Itoa(len(zip(sampleData)))},
"Content-Encoding": []string{"gzip"},
"Transfer-Encoding": nil, // none set
},
expectedBody: zip(sampleData),
},
"chunked transfer-encoding": {
reqHeaders: http.Header{
"Transfer-Encoding": []string{"chunked"},
},
reqBody: chunk(sampleData),
expectedHeaders: http.Header{
"Content-Length": nil, // none set
"Content-Encoding": nil, // none set
"Transfer-Encoding": nil, // Transfer-Encoding gets removed
},
expectedBody: sampleData, // sample data is unchunked
},
"chunked transfer-encoding + gzip content-encoding": {
reqHeaders: http.Header{
"Content-Encoding": []string{"gzip"},
"Transfer-Encoding": []string{"chunked"},
},
reqBody: chunk(zip(sampleData)),
expectedHeaders: http.Header{
"Content-Length": nil, // none set
"Content-Encoding": []string{"gzip"},
"Transfer-Encoding": nil, // gets removed
},
expectedBody: zip(sampleData), // sample data is unchunked, but content-encoding is preserved
},
// "Transfer-Encoding: gzip" is not supported by go
// See http/transfer.go#fixTransferEncoding (https://golang.org/src/net/http/transfer.go#L427)
// Once it is supported, this test case should succeed
//
// "gzip+chunked transfer-encoding": {
// reqHeaders: http.Header{
// "Transfer-Encoding": []string{"chunked,gzip"},
// },
// reqBody: chunk(zip(sampleData)),
//
// expectedHeaders: http.Header{
// "Content-Length": nil, // no content-length headers
// "Transfer-Encoding": nil, // Transfer-Encoding gets removed
// },
// expectedBody: sampleData,
// },
}
successfulResponse := "backend passed tests"
for k, item := range table {
// Start the downstream server
downstreamServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
// Verify headers
for header, v := range item.expectedHeaders {
if !reflect.DeepEqual(v, req.Header[header]) {
t.Errorf("%s: Expected headers for %s to be %v, got %v", k, header, v, req.Header[header])
}
}
// Read body
body, err := ioutil.ReadAll(req.Body)
if err != nil {
t.Errorf("%s: unexpected error %v", k, err)
}
req.Body.Close()
// Verify length
if req.ContentLength > 0 && req.ContentLength != int64(len(body)) {
t.Errorf("%s: ContentLength was %d, len(data) was %d", k, req.ContentLength, len(body))
}
// Verify content
if !bytes.Equal(item.expectedBody, body) {
t.Errorf("%s: Expected %q, got %q", k, string(item.expectedBody), string(body))
}
// Write successful response
w.Write([]byte(successfulResponse))
}))
defer downstreamServer.Close()
responder := &fakeResponder{t: t}
backendURL, _ := url.Parse(downstreamServer.URL)
proxyHandler := &UpgradeAwareProxyHandler{
Location: backendURL,
Responder: responder,
UpgradeRequired: false,
}
proxyServer := httptest.NewServer(proxyHandler)
defer proxyServer.Close()
// Dial the proxy server
conn, err := net.Dial(proxyServer.Listener.Addr().Network(), proxyServer.Listener.Addr().String())
if err != nil {
t.Errorf("unexpected error %v", err)
continue
}
defer conn.Close()
// Add standard http 1.1 headers
if item.reqHeaders == nil {
item.reqHeaders = http.Header{}
}
item.reqHeaders.Add("Connection", "close")
item.reqHeaders.Add("Host", proxyServer.Listener.Addr().String())
// Write the request headers
if _, err := fmt.Fprint(conn, "POST / HTTP/1.1\r\n"); err != nil {
t.Fatalf("%s unexpected error %v", k, err)
}
for header, values := range item.reqHeaders {
for _, value := range values {
if _, err := fmt.Fprintf(conn, "%s: %s\r\n", header, value); err != nil {
t.Fatalf("%s: unexpected error %v", k, err)
}
}
}
// Header separator
if _, err := fmt.Fprint(conn, "\r\n"); err != nil {
t.Fatalf("%s: unexpected error %v", k, err)
}
// Body
if _, err := conn.Write(item.reqBody); err != nil {
t.Fatalf("%s: unexpected error %v", k, err)
}
// Read response
response, err := ioutil.ReadAll(conn)
if err != nil {
t.Errorf("%s: unexpected error %v", k, err)
continue
}
if !strings.HasSuffix(string(response), successfulResponse) {
t.Errorf("%s: Did not get successful response: %s", k, string(response))
continue
}
}
}
// exampleCert was generated from crypto/tls/generate_cert.go with the following command:
// go run generate_cert.go --rsa-bits 512 --host example.com --ca --start-date "Jan 1 00:00:00 1970" --duration=1000000h
var exampleCert = []byte(`-----BEGIN CERTIFICATE-----
MIIBdzCCASGgAwIBAgIRAOVTAdPnfbS5V85mfS90TfIwDQYJKoZIhvcNAQELBQAw
EjEQMA4GA1UEChMHQWNtZSBDbzAgFw03MDAxMDEwMDAwMDBaGA8yMDg0MDEyOTE2
MDAwMFowEjEQMA4GA1UEChMHQWNtZSBDbzBcMA0GCSqGSIb3DQEBAQUAA0sAMEgC
QQCoVSqeu8TBvF+70T7Jm4340YQNhds6IxjRoifenYodAO1dnKGrcbF266DJGunh
nIjQH7B12tduhl0fLK4Ezf7/AgMBAAGjUDBOMA4GA1UdDwEB/wQEAwICpDATBgNV
HSUEDDAKBggrBgEFBQcDATAPBgNVHRMBAf8EBTADAQH/MBYGA1UdEQQPMA2CC2V4
YW1wbGUuY29tMA0GCSqGSIb3DQEBCwUAA0EAk1kVa5uZ/AzwYDVcS9bpM/czwjjV
xq3VeSCfmNa2uNjbFvodmCRwZOHUvipAMGCUCV6j5vMrJ8eMj8tCQ36W9A==
-----END CERTIFICATE-----`)
var exampleKey = []byte(`-----BEGIN RSA PRIVATE KEY-----
MIIBOgIBAAJBAKhVKp67xMG8X7vRPsmbjfjRhA2F2zojGNGiJ96dih0A7V2coatx
sXbroMka6eGciNAfsHXa126GXR8srgTN/v8CAwEAAQJASdzdD7vKsUwMIejGCUb1
fAnLTPfAY3lFCa+CmR89nE22dAoRDv+5RbnBsZ58BazPNJHrsVPRlfXB3OQmSQr0
SQIhANoJhs+xOJE/i8nJv0uAbzKyiD1YkvRkta0GpUOULyAVAiEAxaQus3E/SuqD
P7y5NeJnE7X6XkyC35zrsJRkz7orE8MCIHdDjsI8pjyNDeGqwUCDWE/a6DrmIDwe
emHSqMN2YvChAiEAnxLCM9NWaenOsaIoP+J1rDuvw+4499nJKVqGuVrSCRkCIEqK
4KSchPMc3x8M/uhw9oWTtKFmjA/PPh0FsWCdKrEy
-----END RSA PRIVATE KEY-----`)
// localhostCert was generated from crypto/tls/generate_cert.go with the following command:
// go run generate_cert.go --rsa-bits 512 --host 127.0.0.1,::1,example.com --ca --start-date "Jan 1 00:00:00 1970" --duration=1000000h
var localhostCert = []byte(`-----BEGIN CERTIFICATE-----
MIIBjzCCATmgAwIBAgIRAKpi2WmTcFrVjxrl5n5YDUEwDQYJKoZIhvcNAQELBQAw
EjEQMA4GA1UEChMHQWNtZSBDbzAgFw03MDAxMDEwMDAwMDBaGA8yMDg0MDEyOTE2
MDAwMFowEjEQMA4GA1UEChMHQWNtZSBDbzBcMA0GCSqGSIb3DQEBAQUAA0sAMEgC
QQC9fEbRszP3t14Gr4oahV7zFObBI4TfA5i7YnlMXeLinb7MnvT4bkfOJzE6zktn
59zP7UiHs3l4YOuqrjiwM413AgMBAAGjaDBmMA4GA1UdDwEB/wQEAwICpDATBgNV
HSUEDDAKBggrBgEFBQcDATAPBgNVHRMBAf8EBTADAQH/MC4GA1UdEQQnMCWCC2V4
YW1wbGUuY29thwR/AAABhxAAAAAAAAAAAAAAAAAAAAABMA0GCSqGSIb3DQEBCwUA
A0EAUsVE6KMnza/ZbodLlyeMzdo7EM/5nb5ywyOxgIOCf0OOLHsPS9ueGLQX9HEG
//yjTXuhNcUugExIjM/AIwAZPQ==
-----END CERTIFICATE-----`)
// localhostKey is the private key for localhostCert.
var localhostKey = []byte(`-----BEGIN RSA PRIVATE KEY-----
MIIBOwIBAAJBAL18RtGzM/e3XgavihqFXvMU5sEjhN8DmLtieUxd4uKdvsye9Phu
R84nMTrOS2fn3M/tSIezeXhg66quOLAzjXcCAwEAAQJBAKcRxH9wuglYLBdI/0OT
BLzfWPZCEw1vZmMR2FF1Fm8nkNOVDPleeVGTWoOEcYYlQbpTmkGSxJ6ya+hqRi6x
goECIQDx3+X49fwpL6B5qpJIJMyZBSCuMhH4B7JevhGGFENi3wIhAMiNJN5Q3UkL
IuSvv03kaPR5XVQ99/UeEetUgGvBcABpAiBJSBzVITIVCGkGc7d+RCf49KTCIklv
bGWObufAR8Ni4QIgWpILjW8dkGg8GOUZ0zaNA6Nvt6TIv2UWGJ4v5PoV98kCIQDx
rIiZs5QbKdycsv9gQJzwQAogC8o04X3Zz3dsoX+h4A==
-----END RSA PRIVATE KEY-----`)

View File

@ -1,51 +0,0 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_test(
name = "go_default_test",
srcs = [
"dial_test.go",
"proxy_test.go",
"transport_test.go",
],
library = ":go_default_library",
tags = ["automanaged"],
deps = [
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//vendor/k8s.io/client-go/listers/core/v1:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
],
)
go_library(
name = "go_default_library",
srcs = [
"dial.go",
"doc.go",
"proxy.go",
"transport.go",
],
tags = ["automanaged"],
deps = [
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/golang.org/x/net/html:go_default_library",
"//vendor/golang.org/x/net/html/atom:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/third_party/forked/golang/netutil:go_default_library",
"//vendor/k8s.io/client-go/listers/core/v1:go_default_library",
],
)

View File

@ -1,106 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package proxy
import (
"crypto/tls"
"fmt"
"net"
"net/http"
"net/url"
"github.com/golang/glog"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/third_party/forked/golang/netutil"
)
func DialURL(url *url.URL, transport http.RoundTripper) (net.Conn, error) {
dialAddr := netutil.CanonicalAddr(url)
dialer, _ := utilnet.DialerFor(transport)
switch url.Scheme {
case "http":
if dialer != nil {
return dialer("tcp", dialAddr)
}
return net.Dial("tcp", dialAddr)
case "https":
// Get the tls config from the transport if we recognize it
var tlsConfig *tls.Config
var tlsConn *tls.Conn
var err error
tlsConfig, _ = utilnet.TLSClientConfig(transport)
if dialer != nil {
// We have a dialer; use it to open the connection, then
// create a tls client using the connection.
netConn, err := dialer("tcp", dialAddr)
if err != nil {
return nil, err
}
if tlsConfig == nil {
// tls.Client requires non-nil config
glog.Warningf("using custom dialer with no TLSClientConfig. Defaulting to InsecureSkipVerify")
// tls.Handshake() requires ServerName or InsecureSkipVerify
tlsConfig = &tls.Config{
InsecureSkipVerify: true,
}
} else if len(tlsConfig.ServerName) == 0 && !tlsConfig.InsecureSkipVerify {
// tls.Handshake() requires ServerName or InsecureSkipVerify
// infer the ServerName from the hostname we're connecting to.
inferredHost := dialAddr
if host, _, err := net.SplitHostPort(dialAddr); err == nil {
inferredHost = host
}
// Make a copy to avoid polluting the provided config
tlsConfigCopy := tlsConfig.Clone()
tlsConfigCopy.ServerName = inferredHost
tlsConfig = tlsConfigCopy
}
tlsConn = tls.Client(netConn, tlsConfig)
if err := tlsConn.Handshake(); err != nil {
netConn.Close()
return nil, err
}
} else {
// Dial
tlsConn, err = tls.Dial("tcp", dialAddr, tlsConfig)
if err != nil {
return nil, err
}
}
// Return if we were configured to skip validation
if tlsConfig != nil && tlsConfig.InsecureSkipVerify {
return tlsConn, nil
}
// Verify
host, _, _ := net.SplitHostPort(dialAddr)
if err := tlsConn.VerifyHostname(host); err != nil {
tlsConn.Close()
return nil, err
}
return tlsConn, nil
default:
return nil, fmt.Errorf("Unknown scheme: %s", url.Scheme)
}
}

View File

@ -1,172 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package proxy
import (
"crypto/tls"
"crypto/x509"
"fmt"
"net"
"net/http"
"net/http/httptest"
"net/url"
"reflect"
"strings"
"testing"
utilnet "k8s.io/apimachinery/pkg/util/net"
)
func TestDialURL(t *testing.T) {
roots := x509.NewCertPool()
if !roots.AppendCertsFromPEM(localhostCert) {
t.Fatal("error setting up localhostCert pool")
}
cert, err := tls.X509KeyPair(localhostCert, localhostKey)
if err != nil {
t.Fatal(err)
}
testcases := map[string]struct {
TLSConfig *tls.Config
Dial utilnet.DialFunc
ExpectError string
}{
"insecure": {
TLSConfig: &tls.Config{InsecureSkipVerify: true},
},
"secure, no roots": {
TLSConfig: &tls.Config{InsecureSkipVerify: false},
ExpectError: "unknown authority",
},
"secure with roots": {
TLSConfig: &tls.Config{InsecureSkipVerify: false, RootCAs: roots},
},
"secure with mismatched server": {
TLSConfig: &tls.Config{InsecureSkipVerify: false, RootCAs: roots, ServerName: "bogus.com"},
ExpectError: "not bogus.com",
},
"secure with matched server": {
TLSConfig: &tls.Config{InsecureSkipVerify: false, RootCAs: roots, ServerName: "example.com"},
},
"insecure, custom dial": {
TLSConfig: &tls.Config{InsecureSkipVerify: true},
Dial: net.Dial,
},
"secure, no roots, custom dial": {
TLSConfig: &tls.Config{InsecureSkipVerify: false},
Dial: net.Dial,
ExpectError: "unknown authority",
},
"secure with roots, custom dial": {
TLSConfig: &tls.Config{InsecureSkipVerify: false, RootCAs: roots},
Dial: net.Dial,
},
"secure with mismatched server, custom dial": {
TLSConfig: &tls.Config{InsecureSkipVerify: false, RootCAs: roots, ServerName: "bogus.com"},
Dial: net.Dial,
ExpectError: "not bogus.com",
},
"secure with matched server, custom dial": {
TLSConfig: &tls.Config{InsecureSkipVerify: false, RootCAs: roots, ServerName: "example.com"},
Dial: net.Dial,
},
}
for k, tc := range testcases {
func() {
ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {}))
defer ts.Close()
ts.TLS = &tls.Config{Certificates: []tls.Certificate{cert}}
ts.StartTLS()
tlsConfigCopy := tc.TLSConfig
transport := &http.Transport{
Dial: tc.Dial,
TLSClientConfig: tlsConfigCopy,
}
extractedDial, err := utilnet.DialerFor(transport)
if err != nil {
t.Fatal(err)
}
if fmt.Sprintf("%p", extractedDial) != fmt.Sprintf("%p", tc.Dial) {
t.Fatalf("%s: Unexpected dial", k)
}
extractedTLSConfig, err := utilnet.TLSClientConfig(transport)
if err != nil {
t.Fatal(err)
}
if extractedTLSConfig == nil {
t.Fatalf("%s: Expected tlsConfig", k)
}
u, _ := url.Parse(ts.URL)
_, p, _ := net.SplitHostPort(u.Host)
u.Host = net.JoinHostPort("127.0.0.1", p)
conn, err := DialURL(u, transport)
// Make sure dialing doesn't mutate the transport's TLSConfig
if !reflect.DeepEqual(tc.TLSConfig, tlsConfigCopy) {
t.Errorf("%s: transport's copy of TLSConfig was mutated\n%#v\n\n%#v", k, tc.TLSConfig, tlsConfigCopy)
}
if err != nil {
if tc.ExpectError == "" {
t.Errorf("%s: expected no error, got %q", k, err.Error())
}
if !strings.Contains(err.Error(), tc.ExpectError) {
t.Errorf("%s: expected error containing %q, got %q", k, tc.ExpectError, err.Error())
}
return
}
conn.Close()
if tc.ExpectError != "" {
t.Errorf("%s: expected error %q, got none", k, tc.ExpectError)
}
}()
}
}
// localhostCert was generated from crypto/tls/generate_cert.go with the following command:
// go run generate_cert.go --rsa-bits 512 --host 127.0.0.1,::1,example.com --ca --start-date "Jan 1 00:00:00 1970" --duration=1000000h
var localhostCert = []byte(`-----BEGIN CERTIFICATE-----
MIIBjzCCATmgAwIBAgIRAKpi2WmTcFrVjxrl5n5YDUEwDQYJKoZIhvcNAQELBQAw
EjEQMA4GA1UEChMHQWNtZSBDbzAgFw03MDAxMDEwMDAwMDBaGA8yMDg0MDEyOTE2
MDAwMFowEjEQMA4GA1UEChMHQWNtZSBDbzBcMA0GCSqGSIb3DQEBAQUAA0sAMEgC
QQC9fEbRszP3t14Gr4oahV7zFObBI4TfA5i7YnlMXeLinb7MnvT4bkfOJzE6zktn
59zP7UiHs3l4YOuqrjiwM413AgMBAAGjaDBmMA4GA1UdDwEB/wQEAwICpDATBgNV
HSUEDDAKBggrBgEFBQcDATAPBgNVHRMBAf8EBTADAQH/MC4GA1UdEQQnMCWCC2V4
YW1wbGUuY29thwR/AAABhxAAAAAAAAAAAAAAAAAAAAABMA0GCSqGSIb3DQEBCwUA
A0EAUsVE6KMnza/ZbodLlyeMzdo7EM/5nb5ywyOxgIOCf0OOLHsPS9ueGLQX9HEG
//yjTXuhNcUugExIjM/AIwAZPQ==
-----END CERTIFICATE-----`)
// localhostKey is the private key for localhostCert.
var localhostKey = []byte(`-----BEGIN RSA PRIVATE KEY-----
MIIBOwIBAAJBAL18RtGzM/e3XgavihqFXvMU5sEjhN8DmLtieUxd4uKdvsye9Phu
R84nMTrOS2fn3M/tSIezeXhg66quOLAzjXcCAwEAAQJBAKcRxH9wuglYLBdI/0OT
BLzfWPZCEw1vZmMR2FF1Fm8nkNOVDPleeVGTWoOEcYYlQbpTmkGSxJ6ya+hqRi6x
goECIQDx3+X49fwpL6B5qpJIJMyZBSCuMhH4B7JevhGGFENi3wIhAMiNJN5Q3UkL
IuSvv03kaPR5XVQ99/UeEetUgGvBcABpAiBJSBzVITIVCGkGc7d+RCf49KTCIklv
bGWObufAR8Ni4QIgWpILjW8dkGg8GOUZ0zaNA6Nvt6TIv2UWGJ4v5PoV98kCIQDx
rIiZs5QbKdycsv9gQJzwQAogC8o04X3Zz3dsoX+h4A==
-----END RSA PRIVATE KEY-----`)

View File

@ -1,18 +0,0 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package proxy provides transport and upgrade support for proxies
package proxy // import "k8s.io/apiserver/pkg/util/proxy"

View File

@ -1,241 +0,0 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package proxy
import (
"bytes"
"compress/gzip"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"path"
"strings"
"github.com/golang/glog"
"golang.org/x/net/html"
"golang.org/x/net/html/atom"
"k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/sets"
)
// atomsToAttrs states which attributes of which tags require URL substitution.
// Sources: http://www.w3.org/TR/REC-html40/index/attributes.html
// http://www.w3.org/html/wg/drafts/html/master/index.html#attributes-1
var atomsToAttrs = map[atom.Atom]sets.String{
atom.A: sets.NewString("href"),
atom.Applet: sets.NewString("codebase"),
atom.Area: sets.NewString("href"),
atom.Audio: sets.NewString("src"),
atom.Base: sets.NewString("href"),
atom.Blockquote: sets.NewString("cite"),
atom.Body: sets.NewString("background"),
atom.Button: sets.NewString("formaction"),
atom.Command: sets.NewString("icon"),
atom.Del: sets.NewString("cite"),
atom.Embed: sets.NewString("src"),
atom.Form: sets.NewString("action"),
atom.Frame: sets.NewString("longdesc", "src"),
atom.Head: sets.NewString("profile"),
atom.Html: sets.NewString("manifest"),
atom.Iframe: sets.NewString("longdesc", "src"),
atom.Img: sets.NewString("longdesc", "src", "usemap"),
atom.Input: sets.NewString("src", "usemap", "formaction"),
atom.Ins: sets.NewString("cite"),
atom.Link: sets.NewString("href"),
atom.Object: sets.NewString("classid", "codebase", "data", "usemap"),
atom.Q: sets.NewString("cite"),
atom.Script: sets.NewString("src"),
atom.Source: sets.NewString("src"),
atom.Video: sets.NewString("poster", "src"),
// TODO: css URLs hidden in style elements.
}
// Transport is a transport for text/html content that replaces URLs in html
// content with the prefix of the proxy server
type Transport struct {
Scheme string
Host string
PathPrepend string
http.RoundTripper
}
// RoundTrip implements the http.RoundTripper interface
func (t *Transport) RoundTrip(req *http.Request) (*http.Response, error) {
// Add reverse proxy headers.
forwardedURI := path.Join(t.PathPrepend, req.URL.Path)
if strings.HasSuffix(req.URL.Path, "/") {
forwardedURI = forwardedURI + "/"
}
req.Header.Set("X-Forwarded-Uri", forwardedURI)
if len(t.Host) > 0 {
req.Header.Set("X-Forwarded-Host", t.Host)
}
if len(t.Scheme) > 0 {
req.Header.Set("X-Forwarded-Proto", t.Scheme)
}
rt := t.RoundTripper
if rt == nil {
rt = http.DefaultTransport
}
resp, err := rt.RoundTrip(req)
if err != nil {
message := fmt.Sprintf("Error: '%s'\nTrying to reach: '%v'", err.Error(), req.URL.String())
resp = &http.Response{
StatusCode: http.StatusServiceUnavailable,
Body: ioutil.NopCloser(strings.NewReader(message)),
}
return resp, nil
}
if redirect := resp.Header.Get("Location"); redirect != "" {
resp.Header.Set("Location", t.rewriteURL(redirect, req.URL))
return resp, nil
}
cType := resp.Header.Get("Content-Type")
cType = strings.TrimSpace(strings.SplitN(cType, ";", 2)[0])
if cType != "text/html" {
// Do nothing, simply pass through
return resp, nil
}
return t.rewriteResponse(req, resp)
}
var _ = net.RoundTripperWrapper(&Transport{})
func (rt *Transport) WrappedRoundTripper() http.RoundTripper {
return rt.RoundTripper
}
// rewriteURL rewrites a single URL to go through the proxy, if the URL refers
// to the same host as sourceURL, which is the page on which the target URL
// occurred. If any error occurs (e.g. parsing), it returns targetURL.
func (t *Transport) rewriteURL(targetURL string, sourceURL *url.URL) string {
url, err := url.Parse(targetURL)
if err != nil {
return targetURL
}
isDifferentHost := url.Host != "" && url.Host != sourceURL.Host
isRelative := !strings.HasPrefix(url.Path, "/")
if isDifferentHost || isRelative {
return targetURL
}
url.Scheme = t.Scheme
url.Host = t.Host
origPath := url.Path
// Do not rewrite URL if the sourceURL already contains the necessary prefix.
if strings.HasPrefix(url.Path, t.PathPrepend) {
return url.String()
}
url.Path = path.Join(t.PathPrepend, url.Path)
if strings.HasSuffix(origPath, "/") {
// Add back the trailing slash, which was stripped by path.Join().
url.Path += "/"
}
return url.String()
}
// rewriteHTML scans the HTML for tags with url-valued attributes, and updates
// those values with the urlRewriter function. The updated HTML is output to the
// writer.
func rewriteHTML(reader io.Reader, writer io.Writer, urlRewriter func(string) string) error {
// Note: This assumes the content is UTF-8.
tokenizer := html.NewTokenizer(reader)
var err error
for err == nil {
tokenType := tokenizer.Next()
switch tokenType {
case html.ErrorToken:
err = tokenizer.Err()
case html.StartTagToken, html.SelfClosingTagToken:
token := tokenizer.Token()
if urlAttrs, ok := atomsToAttrs[token.DataAtom]; ok {
for i, attr := range token.Attr {
if urlAttrs.Has(attr.Key) {
token.Attr[i].Val = urlRewriter(attr.Val)
}
}
}
_, err = writer.Write([]byte(token.String()))
default:
_, err = writer.Write(tokenizer.Raw())
}
}
if err != io.EOF {
return err
}
return nil
}
// rewriteResponse modifies an HTML response by updating absolute links referring
// to the original host to instead refer to the proxy transport.
func (t *Transport) rewriteResponse(req *http.Request, resp *http.Response) (*http.Response, error) {
origBody := resp.Body
defer origBody.Close()
newContent := &bytes.Buffer{}
var reader io.Reader = origBody
var writer io.Writer = newContent
encoding := resp.Header.Get("Content-Encoding")
switch encoding {
case "gzip":
var err error
reader, err = gzip.NewReader(reader)
if err != nil {
return nil, fmt.Errorf("errorf making gzip reader: %v", err)
}
gzw := gzip.NewWriter(writer)
defer gzw.Close()
writer = gzw
// TODO: support flate, other encodings.
case "":
// This is fine
default:
// Some encoding we don't understand-- don't try to parse this
glog.Errorf("Proxy encountered encoding %v for text/html; can't understand this so not fixing links.", encoding)
return resp, nil
}
urlRewriter := func(targetUrl string) string {
return t.rewriteURL(targetUrl, req.URL)
}
err := rewriteHTML(reader, writer, urlRewriter)
if err != nil {
glog.Errorf("Failed to rewrite URLs: %v", err)
return resp, err
}
resp.Body = ioutil.NopCloser(newContent)
// Update header node with new content-length
// TODO: Remove any hash/signature headers here?
resp.Header.Del("Content-Length")
resp.ContentLength = int64(newContent.Len())
return resp, err
}

View File

@ -1,261 +0,0 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package proxy
import (
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"testing"
)
func parseURLOrDie(inURL string) *url.URL {
parsed, err := url.Parse(inURL)
if err != nil {
panic(err)
}
return parsed
}
func TestProxyTransport(t *testing.T) {
testTransport := &Transport{
Scheme: "http",
Host: "foo.com",
PathPrepend: "/proxy/node/node1:10250",
}
testTransport2 := &Transport{
Scheme: "https",
Host: "foo.com",
PathPrepend: "/proxy/node/node1:8080",
}
emptyHostTransport := &Transport{
Scheme: "https",
PathPrepend: "/proxy/node/node1:10250",
}
emptySchemeTransport := &Transport{
Host: "foo.com",
PathPrepend: "/proxy/node/node1:10250",
}
type Item struct {
input string
sourceURL string
transport *Transport
output string
contentType string
forwardedURI string
redirect string
redirectWant string
}
table := map[string]Item{
"normal": {
input: `<pre><a href="kubelet.log">kubelet.log</a><a href="/google.log">google.log</a></pre>`,
sourceURL: "http://mynode.com/logs/log.log",
transport: testTransport,
output: `<pre><a href="kubelet.log">kubelet.log</a><a href="http://foo.com/proxy/node/node1:10250/google.log">google.log</a></pre>`,
contentType: "text/html",
forwardedURI: "/proxy/node/node1:10250/logs/log.log",
},
"full document": {
input: `<html><header></header><body><pre><a href="kubelet.log">kubelet.log</a><a href="/google.log">google.log</a></pre></body></html>`,
sourceURL: "http://mynode.com/logs/log.log",
transport: testTransport,
output: `<html><header></header><body><pre><a href="kubelet.log">kubelet.log</a><a href="http://foo.com/proxy/node/node1:10250/google.log">google.log</a></pre></body></html>`,
contentType: "text/html",
forwardedURI: "/proxy/node/node1:10250/logs/log.log",
},
"trailing slash": {
input: `<pre><a href="kubelet.log">kubelet.log</a><a href="/google.log/">google.log</a></pre>`,
sourceURL: "http://mynode.com/logs/log.log",
transport: testTransport,
output: `<pre><a href="kubelet.log">kubelet.log</a><a href="http://foo.com/proxy/node/node1:10250/google.log/">google.log</a></pre>`,
contentType: "text/html",
forwardedURI: "/proxy/node/node1:10250/logs/log.log",
},
"content-type charset": {
input: `<pre><a href="kubelet.log">kubelet.log</a><a href="/google.log">google.log</a></pre>`,
sourceURL: "http://mynode.com/logs/log.log",
transport: testTransport,
output: `<pre><a href="kubelet.log">kubelet.log</a><a href="http://foo.com/proxy/node/node1:10250/google.log">google.log</a></pre>`,
contentType: "text/html; charset=utf-8",
forwardedURI: "/proxy/node/node1:10250/logs/log.log",
},
"content-type passthrough": {
input: `<pre><a href="kubelet.log">kubelet.log</a><a href="/google.log">google.log</a></pre>`,
sourceURL: "http://mynode.com/logs/log.log",
transport: testTransport,
output: `<pre><a href="kubelet.log">kubelet.log</a><a href="/google.log">google.log</a></pre>`,
contentType: "text/plain",
forwardedURI: "/proxy/node/node1:10250/logs/log.log",
},
"subdir": {
input: `<a href="kubelet.log">kubelet.log</a><a href="/google.log">google.log</a>`,
sourceURL: "http://mynode.com/whatever/apt/somelog.log",
transport: testTransport2,
output: `<a href="kubelet.log">kubelet.log</a><a href="https://foo.com/proxy/node/node1:8080/google.log">google.log</a>`,
contentType: "text/html",
forwardedURI: "/proxy/node/node1:8080/whatever/apt/somelog.log",
},
"image": {
input: `<pre><img src="kubernetes.jpg"/><img src="/kubernetes_abs.jpg"/></pre>`,
sourceURL: "http://mynode.com/",
transport: testTransport,
output: `<pre><img src="kubernetes.jpg"/><img src="http://foo.com/proxy/node/node1:10250/kubernetes_abs.jpg"/></pre>`,
contentType: "text/html",
forwardedURI: "/proxy/node/node1:10250/",
},
"abs": {
input: `<script src="http://google.com/kubernetes.js"/>`,
sourceURL: "http://mynode.com/any/path/",
transport: testTransport,
output: `<script src="http://google.com/kubernetes.js"/>`,
contentType: "text/html",
forwardedURI: "/proxy/node/node1:10250/any/path/",
},
"abs but same host": {
input: `<script src="http://mynode.com/kubernetes.js"/>`,
sourceURL: "http://mynode.com/any/path/",
transport: testTransport,
output: `<script src="http://foo.com/proxy/node/node1:10250/kubernetes.js"/>`,
contentType: "text/html",
forwardedURI: "/proxy/node/node1:10250/any/path/",
},
"redirect rel": {
sourceURL: "http://mynode.com/redirect",
transport: testTransport,
redirect: "/redirected/target/",
redirectWant: "http://foo.com/proxy/node/node1:10250/redirected/target/",
forwardedURI: "/proxy/node/node1:10250/redirect",
},
"redirect abs same host": {
sourceURL: "http://mynode.com/redirect",
transport: testTransport,
redirect: "http://mynode.com/redirected/target/",
redirectWant: "http://foo.com/proxy/node/node1:10250/redirected/target/",
forwardedURI: "/proxy/node/node1:10250/redirect",
},
"redirect abs other host": {
sourceURL: "http://mynode.com/redirect",
transport: testTransport,
redirect: "http://example.com/redirected/target/",
redirectWant: "http://example.com/redirected/target/",
forwardedURI: "/proxy/node/node1:10250/redirect",
},
"source contains the redirect already": {
input: `<pre><a href="kubelet.log">kubelet.log</a><a href="http://foo.com/proxy/node/node1:10250/google.log">google.log</a></pre>`,
sourceURL: "http://foo.com/logs/log.log",
transport: testTransport,
output: `<pre><a href="kubelet.log">kubelet.log</a><a href="http://foo.com/proxy/node/node1:10250/google.log">google.log</a></pre>`,
contentType: "text/html",
forwardedURI: "/proxy/node/node1:10250/logs/log.log",
},
"no host": {
input: "<html></html>",
sourceURL: "http://mynode.com/logs/log.log",
transport: emptyHostTransport,
output: "<html></html>",
contentType: "text/html",
forwardedURI: "/proxy/node/node1:10250/logs/log.log",
},
"no scheme": {
input: "<html></html>",
sourceURL: "http://mynode.com/logs/log.log",
transport: emptySchemeTransport,
output: "<html></html>",
contentType: "text/html",
forwardedURI: "/proxy/node/node1:10250/logs/log.log",
},
}
testItem := func(name string, item *Item) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Check request headers.
if got, want := r.Header.Get("X-Forwarded-Uri"), item.forwardedURI; got != want {
t.Errorf("%v: X-Forwarded-Uri = %q, want %q", name, got, want)
}
if len(item.transport.Host) == 0 {
_, present := r.Header["X-Forwarded-Host"]
if present {
t.Errorf("%v: X-Forwarded-Host header should not be present", name)
}
} else {
if got, want := r.Header.Get("X-Forwarded-Host"), item.transport.Host; got != want {
t.Errorf("%v: X-Forwarded-Host = %q, want %q", name, got, want)
}
}
if len(item.transport.Scheme) == 0 {
_, present := r.Header["X-Forwarded-Proto"]
if present {
t.Errorf("%v: X-Forwarded-Proto header should not be present", name)
}
} else {
if got, want := r.Header.Get("X-Forwarded-Proto"), item.transport.Scheme; got != want {
t.Errorf("%v: X-Forwarded-Proto = %q, want %q", name, got, want)
}
}
// Send response.
if item.redirect != "" {
http.Redirect(w, r, item.redirect, http.StatusMovedPermanently)
return
}
w.Header().Set("Content-Type", item.contentType)
fmt.Fprint(w, item.input)
}))
defer server.Close()
// Replace source URL with our test server address.
sourceURL := parseURLOrDie(item.sourceURL)
serverURL := parseURLOrDie(server.URL)
item.input = strings.Replace(item.input, sourceURL.Host, serverURL.Host, -1)
item.redirect = strings.Replace(item.redirect, sourceURL.Host, serverURL.Host, -1)
sourceURL.Host = serverURL.Host
req, err := http.NewRequest("GET", sourceURL.String(), nil)
if err != nil {
t.Errorf("%v: Unexpected error: %v", name, err)
return
}
resp, err := item.transport.RoundTrip(req)
if err != nil {
t.Errorf("%v: Unexpected error: %v", name, err)
return
}
if item.redirect != "" {
// Check that redirect URLs get rewritten properly.
if got, want := resp.Header.Get("Location"), item.redirectWant; got != want {
t.Errorf("%v: Location header = %q, want %q", name, got, want)
}
return
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Errorf("%v: Unexpected error: %v", name, err)
return
}
if e, a := item.output, string(body); e != a {
t.Errorf("%v: expected %v, but got %v", name, e, a)
}
}
for name, item := range table {
testItem(name, &item)
}
}