Move generic packages from serving to pkg. (#927)

* Move generic packages from serving to pkg.

I was asked to move this to do some re-use in Eventing.

/cc @chizh
/assign mattmoor @tcnghia

* fix the compile error
This commit is contained in:
Victor Agababov 2019-12-06 17:57:40 -08:00 committed by Knative Prow Robot
parent eec28e7bbd
commit 37bd04dc3e
11 changed files with 1438 additions and 1 deletions

5
Gopkg.lock generated
View File

@ -618,13 +618,14 @@
[[projects]]
branch = "master"
digest = "1:5578b99717f08e6480d7e0480f758749c12f9cc5da19a33a863dc7307fd699fb"
digest = "1:ab30187b3fce52d6bdedddd463ac788423992846b617ddda81d4fd1e04c3d342"
name = "golang.org/x/net"
packages = [
"context",
"context/ctxhttp",
"http/httpguts",
"http2",
"http2/h2c",
"http2/hpack",
"idna",
"internal/timeseries",
@ -1364,6 +1365,8 @@
"go.uber.org/zap/zapcore",
"go.uber.org/zap/zaptest",
"golang.org/x/net/context",
"golang.org/x/net/http2",
"golang.org/x/net/http2/h2c",
"golang.org/x/oauth2",
"golang.org/x/sync/errgroup",
"google.golang.org/api/container/v1beta1",

View File

@ -2,3 +2,4 @@
approvers:
- tcnghia
- vagababov

43
network/error_handler.go Normal file
View File

@ -0,0 +1,43 @@
/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package network
import (
"io/ioutil"
"net/http"
"go.uber.org/zap"
)
// ErrorHandler sets up a handler suitable for use with the ErrorHandler field on
// httputil's reverse proxy.
func ErrorHandler(logger *zap.SugaredLogger) func(http.ResponseWriter, *http.Request, error) {
return func(w http.ResponseWriter, req *http.Request, err error) {
ss := readSockStat(logger)
logger.Errorw("error reverse proxying request; sockstat: "+ss, zap.Error(err))
http.Error(w, err.Error(), http.StatusBadGateway)
}
}
func readSockStat(logger *zap.SugaredLogger) string {
b, err := ioutil.ReadFile("/proc/net/sockstat")
if err != nil {
logger.Errorw("Unable to read sockstat", zap.Error(err))
return ""
}
return string(b)
}

54
network/h2c.go Normal file
View File

@ -0,0 +1,54 @@
/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package network
import (
"crypto/tls"
"net"
"net/http"
"time"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
)
// NewServer returns a new HTTP Server with HTTP2 handler.
func NewServer(addr string, h http.Handler) *http.Server {
h1s := &http.Server{
Addr: addr,
Handler: h2c.NewHandler(h, &http2.Server{}),
}
return h1s
}
// NewH2CTransport constructs a new H2C transport.
// That transport will reroute all HTTPS traffic to HTTP. This is
// to explicitly allow h2c (http2 without TLS) transport.
// See https://github.com/golang/go/issues/14141 for more details.
func NewH2CTransport() http.RoundTripper {
return &http2.Transport{
AllowHTTP: true,
DialTLS: func(netw, addr string, cfg *tls.Config) (net.Conn, error) {
d := &net.Dialer{
Timeout: DefaultConnTimeout,
KeepAlive: 5 * time.Second,
DualStack: true,
}
return d.Dial(netw, addr)
},
}
}

45
network/network.go Normal file
View File

@ -0,0 +1,45 @@
/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package network
import (
"time"
)
const (
// DefaultConnTimeout specifies a short default connection timeout
// to avoid hitting the issue fixed in
// https://github.com/kubernetes/kubernetes/pull/72534 but only
// avalailable after Kubernetes 1.14.
//
// Our connections are usually between pods in the same cluster
// like activator <-> queue-proxy, or even between containers
// within the same pod queue-proxy <-> user-container, so a
// smaller connect timeout would be justifiable.
//
// We should consider exposing this as a configuration.
DefaultConnTimeout = 200 * time.Millisecond
// UserAgentKey is the constant for header "User-Agent".
UserAgentKey = "User-Agent"
// ProbeHeaderName is the name of a header that can be added to
// requests to probe the knative networking layer. Requests
// with this header will not be passed to the user container or
// included in request metrics.
ProbeHeaderName = "K-Network-Probe"
)

18
network/prober/doc.go Normal file
View File

@ -0,0 +1,18 @@
/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package prober contains functionality for implementing probing in knative serving.
package prober

192
network/prober/prober.go Normal file
View File

@ -0,0 +1,192 @@
/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package prober
import (
"context"
"fmt"
"io/ioutil"
"net/http"
"sync"
"time"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"knative.dev/pkg/logging"
)
// Preparer is a way for the caller to modify the HTTP request before it goes out.
type Preparer func(r *http.Request) *http.Request
// Verifier is a way for the caller to validate the HTTP response after it comes back.
type Verifier func(r *http.Response, b []byte) (bool, error)
// WithHeader sets a header in the probe request.
func WithHeader(name, value string) Preparer {
return func(r *http.Request) *http.Request {
r.Header.Set(name, value)
return r
}
}
// WithHost sets the host in the probe request.
func WithHost(host string) Preparer {
return func(r *http.Request) *http.Request {
r.Host = host
return r
}
}
// ExpectsBody validates that the body of the probe response matches the provided string.
func ExpectsBody(body string) Verifier {
return func(r *http.Response, b []byte) (bool, error) {
if string(b) == body {
return true, nil
}
return false, fmt.Errorf("unexpected body: want %q, got %q", body, string(b))
}
}
// ExpectsHeader validates that the given header of the probe response matches the provided string.
func ExpectsHeader(name, value string) Verifier {
return func(r *http.Response, _ []byte) (bool, error) {
if r.Header.Get(name) == value {
return true, nil
}
return false, fmt.Errorf("unexpected header %q: want %q, got %q", name, value, r.Header.Get(name))
}
}
// ExpectsStatusCodes validates that the given status code of the probe response matches the provided int.
func ExpectsStatusCodes(statusCodes []int) Verifier {
return func(r *http.Response, _ []byte) (bool, error) {
for _, v := range statusCodes {
if r.StatusCode == v {
return true, nil
}
}
return false, fmt.Errorf("unexpected status code: want %v, got %v", statusCodes, r.StatusCode)
}
}
// Do sends a single probe to given target, e.g. `http://revision.default.svc.cluster.local:81`.
// Do returns whether the probe was successful or not, or there was an error probing.
func Do(ctx context.Context, transport http.RoundTripper, target string, ops ...interface{}) (bool, error) {
req, err := http.NewRequest(http.MethodGet, target, nil)
if err != nil {
return false, fmt.Errorf("%s is not a valid URL: %v", target, err)
}
for _, op := range ops {
if po, ok := op.(Preparer); ok {
req = po(req)
}
}
req = req.WithContext(ctx)
resp, err := transport.RoundTrip(req)
if err != nil {
return false, fmt.Errorf("error roundtripping %s: %v", target, err)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return false, fmt.Errorf("error reading body: %v", err)
}
for _, op := range ops {
if vo, ok := op.(Verifier); ok {
if ok, err := vo(resp, body); err != nil || !ok {
return false, err
}
}
}
return true, nil
}
// Done is a callback that is executed when the async probe has finished.
// `arg` is given by the caller at the offering time, while `success` and `err`
// are the return values of the `Do` call.
// It is assumed that the opaque arg is consistent for a given target and
// we will coalesce concurrent Offer invocations on target.
type Done func(arg interface{}, success bool, err error)
// Manager manages async probes and makes sure we run concurrently only a single
// probe for the same key.
type Manager struct {
cb Done
// NB: it is paramount to use a transport that will close the connection
// after every request here. Otherwise the cached connections will prohibit
// scaling to zero, due to unsuccessful probes to the Activator.
transport http.RoundTripper
// mu guards keys.
mu sync.Mutex
keys sets.String
}
// New creates a new Manager, that will invoke the given callback when
// async probing is finished.
func New(cb Done, transport http.RoundTripper) *Manager {
return &Manager{
keys: sets.NewString(),
cb: cb,
transport: transport,
}
}
// Offer executes asynchronous probe using `target` as the key.
// If a probe with the same key already exists, Offer will return false and the
// call is discarded. If the request is accepted, Offer returns true.
// Otherwise Offer starts a goroutine that periodically executes
// `Do`, until timeout is reached, the probe succeeds, or fails with an error.
// In the end the callback is invoked with the provided `arg` and probing results.
func (m *Manager) Offer(ctx context.Context, target string, arg interface{}, period, timeout time.Duration, ops ...interface{}) bool {
m.mu.Lock()
defer m.mu.Unlock()
if m.keys.Has(target) {
return false
}
m.keys.Insert(target)
m.doAsync(ctx, target, arg, period, timeout, ops...)
return true
}
// doAsync starts a go routine that probes the target with given period.
func (m *Manager) doAsync(ctx context.Context, target string, arg interface{}, period, timeout time.Duration, ops ...interface{}) {
logger := logging.FromContext(ctx)
go func() {
defer func() {
m.mu.Lock()
defer m.mu.Unlock()
m.keys.Delete(target)
}()
var (
result bool
inErr error
)
err := wait.PollImmediate(period, timeout, func() (bool, error) {
result, inErr = Do(ctx, m.transport, target, ops...)
// Do not return error, which is from verifierError, as retry is expected until timeout.
return result, nil
})
if inErr != nil {
logger.Errorw("Unable to read sockstat", zap.Error(inErr))
}
m.cb(arg, result, err)
}()
}

View File

@ -0,0 +1,362 @@
/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package prober
import (
"context"
"net"
"net/http"
"net/http/httptest"
"testing"
"time"
"k8s.io/apimachinery/pkg/util/wait"
"knative.dev/pkg/network"
)
const (
systemName = "test-server"
unexpectedProbeMessage = "unexpected probe header value: whatever"
probeInterval = 10 * time.Millisecond
probeTimeout = 200 * time.Millisecond
)
func probeServeFunc(w http.ResponseWriter, r *http.Request) {
s := r.Header.Get(network.ProbeHeaderName)
switch s {
case "":
// No header.
w.WriteHeader(http.StatusNotFound)
case systemName:
// Expected header value.
w.Write([]byte(systemName))
default:
// Unexpected header value.
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(unexpectedProbeMessage))
}
}
func TestDoServing(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(probeServeFunc))
defer ts.Close()
tests := []struct {
name string
headerValue string
want bool
expErr bool
}{{
name: "ok",
headerValue: systemName,
want: true,
expErr: false,
}, {
name: "wrong system",
headerValue: "bells-and-whistles",
want: false,
expErr: true,
}, {
name: "no header",
headerValue: "",
want: false,
expErr: true,
}}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
got, err := Do(context.Background(), network.NewAutoTransport(), ts.URL, WithHeader(network.ProbeHeaderName, test.headerValue), ExpectsBody(systemName), ExpectsStatusCodes([]int{http.StatusOK}))
if want := test.want; got != want {
t.Errorf("Got = %v, want: %v", got, want)
}
if err != nil && !test.expErr {
t.Errorf("Do() = %v, no error expected", err)
}
if err == nil && test.expErr {
t.Errorf("Do() = nil, expected an error")
}
})
}
}
func TestBlackHole(t *testing.T) {
transport := &http.Transport{
Dial: (&net.Dialer{
Timeout: 10 * time.Millisecond,
}).Dial,
}
got, err := Do(context.Background(), transport, "http://gone.fishing.svc.custer.local:8080", ExpectsStatusCodes([]int{http.StatusOK}))
if want := false; got != want {
t.Errorf("Got = %v, want: %v", got, want)
}
if err == nil {
t.Error("Do did not return an error")
}
}
func TestBadURL(t *testing.T) {
_, err := Do(context.Background(), network.NewAutoTransport(), ":foo", ExpectsStatusCodes([]int{http.StatusOK}))
if err == nil {
t.Error("Do did not return an error")
}
t.Logf("For the curious the error was: %v", err)
}
func TestDoAsync(t *testing.T) {
// This replicates the TestDo.
ts := httptest.NewServer(http.HandlerFunc(probeServeFunc))
defer ts.Close()
wch := make(chan interface{})
defer close(wch)
tests := []struct {
name string
headerValue string
cb Done
}{{
name: "ok",
headerValue: systemName,
cb: func(arg interface{}, ret bool, err error) {
defer func() {
wch <- 42
}()
if got, want := arg.(string), "ok"; got != want {
t.Errorf("arg = %s, want: %s", got, want)
}
if !ret {
t.Error("result was false")
}
},
}, {
name: "wrong system",
headerValue: "bells-and-whistles",
cb: func(arg interface{}, ret bool, err error) {
defer func() {
wch <- 1984
}()
if ret {
t.Error("result was true")
}
},
}, {
name: "no header",
headerValue: "",
cb: func(arg interface{}, ret bool, err error) {
defer func() {
wch <- 2006
}()
if ret {
t.Error("result was true")
}
},
}}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
m := New(test.cb, network.NewProberTransport())
m.Offer(context.Background(), ts.URL, test.name, probeInterval, probeTimeout, WithHeader(network.ProbeHeaderName, test.headerValue), ExpectsBody(test.headerValue), ExpectsStatusCodes([]int{http.StatusOK}))
<-wch
})
}
}
type thirdTimesTheCharmProber struct {
calls int
}
func (t *thirdTimesTheCharmProber) ServeHTTP(w http.ResponseWriter, r *http.Request) {
t.calls++
if t.calls < 3 {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(unexpectedProbeMessage))
return
}
w.Write([]byte(systemName))
}
func TestDoAsyncRepeat(t *testing.T) {
c := &thirdTimesTheCharmProber{}
ts := httptest.NewServer(c)
defer ts.Close()
wch := make(chan interface{})
defer close(wch)
cb := func(arg interface{}, done bool, err error) {
if !done {
t.Error("done was false")
}
if err != nil {
t.Errorf("Unexpected error = %v", err)
}
wch <- arg
}
m := New(cb, network.NewProberTransport())
m.Offer(context.Background(), ts.URL, 42, probeInterval, probeTimeout, WithHeader(network.ProbeHeaderName, systemName), ExpectsBody(systemName), ExpectsStatusCodes([]int{http.StatusOK}))
<-wch
if got, want := c.calls, 3; got != want {
t.Errorf("Probe invocation count = %d, want: %d", got, want)
}
}
func TestDoAsyncTimeout(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNotFound)
}))
defer ts.Close()
wch := make(chan interface{})
defer close(wch)
cb := func(arg interface{}, done bool, err error) {
if done {
t.Errorf("done was true")
}
if err != wait.ErrWaitTimeout {
t.Errorf("Unexpected error = %v", err)
}
wch <- arg
}
m := New(cb, network.NewProberTransport())
m.Offer(context.Background(), ts.URL, 2009, probeInterval, probeTimeout, ExpectsStatusCodes([]int{http.StatusOK}))
<-wch
}
func TestAsyncMultiple(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(probeServeFunc))
defer ts.Close()
wch := make(chan interface{})
defer close(wch)
cb := func(arg interface{}, done bool, err error) {
<-wch
wch <- 2006
}
m := New(cb, network.NewProberTransport())
if !m.Offer(context.Background(), ts.URL, 1984, probeInterval, probeTimeout, ExpectsStatusCodes([]int{http.StatusOK})) {
t.Error("First call to offer returned false")
}
if m.Offer(context.Background(), ts.URL, 1982, probeInterval, probeTimeout, ExpectsStatusCodes([]int{http.StatusOK})) {
t.Error("Second call to offer returned true")
}
if got, want := m.len(), 1; got != want {
t.Errorf("Number of queued items = %d, want: %d", got, want)
}
// Make sure we terminate the first probe.
wch <- 2009
<-wch
wait.PollImmediate(probeInterval, probeTimeout, func() (bool, error) {
return m.len() == 0, nil
})
if got, want := m.len(), 0; got != want {
t.Errorf("Number of queued items = %d, want: %d", got, want)
}
}
func TestWithHostOption(t *testing.T) {
host := "foobar.com"
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
t.Logf("Want: %s, Got: %s\n", host, r.Host)
if r.Host != host {
w.WriteHeader(404)
}
}))
defer ts.Close()
tests := []struct {
name string
options []interface{}
success bool
expErr bool
}{{
name: "no hosts",
options: []interface{}{ExpectsStatusCodes([]int{http.StatusOK})},
success: false,
expErr: true,
}, {
name: "expected host",
options: []interface{}{WithHost(host), ExpectsStatusCodes([]int{http.StatusOK})},
success: true,
expErr: false,
}, {
name: "wrong host",
options: []interface{}{WithHost("nope.com"), ExpectsStatusCodes([]int{http.StatusOK})},
success: false,
expErr: true,
}}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ok, err := Do(context.Background(), network.AutoTransport, ts.URL, test.options...)
if ok != test.success {
t.Errorf("unexpected probe result: want: %v, got: %v", test.success, ok)
}
if err != nil && !test.expErr {
t.Errorf("Do() = %v, no error expected", err)
}
if err == nil && test.expErr {
t.Errorf("Do() = nil, expected an error")
}
})
}
}
func TestExpectsHeaderOption(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Foo", "Bar")
}))
defer ts.Close()
tests := []struct {
name string
options []interface{}
success bool
expErr bool
}{{
name: "header is present",
options: []interface{}{ExpectsHeader("Foo", "Bar"), ExpectsStatusCodes([]int{http.StatusOK})},
success: true,
}, {
name: "header is absent",
options: []interface{}{ExpectsHeader("Baz", "Nope"), ExpectsStatusCodes([]int{http.StatusOK})},
success: false,
expErr: true,
}, {
name: "header value doesn't match",
options: []interface{}{ExpectsHeader("Foo", "Baz"), ExpectsStatusCodes([]int{http.StatusOK})},
success: false,
expErr: true,
}}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ok, err := Do(context.Background(), network.AutoTransport, ts.URL, test.options...)
if ok != test.success {
t.Errorf("unexpected probe result: want: %v, got: %v", test.success, ok)
}
if err != nil && !test.expErr {
t.Errorf("Do() = %v, no error expected", err)
}
if err == nil && test.expErr {
t.Errorf("Do() = nil, expected an error")
}
})
}
}
func (m *Manager) len() int {
m.mu.Lock()
defer m.mu.Unlock()
return m.keys.Len()
}

120
network/transports.go Normal file
View File

@ -0,0 +1,120 @@
/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package network
import (
"context"
"errors"
"net"
"net/http"
"time"
"k8s.io/apimachinery/pkg/util/wait"
)
// RoundTripperFunc implementation roundtrips a request.
type RoundTripperFunc func(*http.Request) (*http.Response, error)
// RoundTrip implements http.RoundTripper.
func (rt RoundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) {
return rt(r)
}
func newAutoTransport(v1 http.RoundTripper, v2 http.RoundTripper) http.RoundTripper {
return RoundTripperFunc(func(r *http.Request) (*http.Response, error) {
t := v1
if r.ProtoMajor == 2 {
t = v2
}
return t.RoundTrip(r)
})
}
const sleepTO = 30 * time.Millisecond
var backOffTemplate = wait.Backoff{
Duration: 50 * time.Millisecond,
Factor: 1.4,
Jitter: 0.1, // At most 10% jitter.
Steps: 15,
}
var errDialTimeout = errors.New("timed out dialing")
// dialWithBackOff executes `net.Dialer.DialContext()` with exponentially increasing
// dial timeouts. In addition it sleeps with random jitter between tries.
func dialWithBackOff(ctx context.Context, network, address string) (net.Conn, error) {
return dialBackOffHelper(ctx, network, address, backOffTemplate, sleepTO)
}
func dialBackOffHelper(ctx context.Context, network, address string, bo wait.Backoff, sleep time.Duration) (net.Conn, error) {
dialer := &net.Dialer{
Timeout: bo.Duration, // Initial duration.
KeepAlive: 5 * time.Second,
DualStack: true,
}
for {
c, err := dialer.DialContext(ctx, network, address)
if err != nil {
if err, ok := err.(net.Error); ok && err.Timeout() {
if bo.Steps < 1 {
break
}
dialer.Timeout = bo.Step()
time.Sleep(wait.Jitter(sleep, 1.0)) // Sleep with jitter.
continue
}
return nil, err
}
return c, nil
}
return nil, errDialTimeout
}
func newHTTPTransport(connTimeout time.Duration, disableKeepAlives bool) http.RoundTripper {
return &http.Transport{
// Those match net/http/transport.go
Proxy: http.ProxyFromEnvironment,
MaxIdleConns: 1000,
MaxIdleConnsPerHost: 100,
IdleConnTimeout: 5 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
DisableKeepAlives: disableKeepAlives,
// This is bespoke.
DialContext: dialWithBackOff,
}
}
// NewProberTransport creates a RoundTripper that is useful for probing,
// since it will not cache connections.
func NewProberTransport() http.RoundTripper {
return newAutoTransport(
newHTTPTransport(DefaultConnTimeout, true /*disable keep-alives*/),
NewH2CTransport())
}
// NewAutoTransport creates a RoundTripper that can use appropriate transport
// based on the request's HTTP version.
func NewAutoTransport() http.RoundTripper {
return newAutoTransport(
newHTTPTransport(DefaultConnTimeout, false /*disable keep-alives*/),
NewH2CTransport())
}
// AutoTransport uses h2c for HTTP2 requests and falls back to `http.DefaultTransport` for all others
var AutoTransport = NewAutoTransport()

104
network/transports_test.go Normal file
View File

@ -0,0 +1,104 @@
/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package network
import (
"context"
"net/http"
"net/http/httptest"
"strings"
"testing"
"k8s.io/apimachinery/pkg/util/sets"
)
func TestHTTPRoundTripper(t *testing.T) {
wants := sets.NewString()
frt := func(key string) http.RoundTripper {
return RoundTripperFunc(func(r *http.Request) (*http.Response, error) {
wants.Insert(key)
return nil, nil
})
}
rt := newAutoTransport(frt("v1"), frt("v2"))
examples := []struct {
label string
protoMajor int
want string
}{
{
label: "use default transport for HTTP1",
protoMajor: 1,
want: "v1",
},
{
label: "use h2c transport for HTTP2",
protoMajor: 2,
want: "v2",
},
{
label: "use default transport for all others",
protoMajor: 99,
want: "v1",
},
}
for _, e := range examples {
t.Run(e.label, func(t *testing.T) {
wants.Delete(e.want)
r := &http.Request{ProtoMajor: e.protoMajor}
rt.RoundTrip(r)
if !wants.Has(e.want) {
t.Error("Wrong transport selected for request.")
}
})
}
}
func TestDialWithBackoff(t *testing.T) {
// Nobody's listening on a random port. Usually.
c, err := dialWithBackOff(context.Background(), "tcp4", "127.0.0.1:41482")
if err == nil {
c.Close()
t.Error("Unexpected success dialing")
}
// Make the test short.
bo := backOffTemplate
bo.Steps = 2
// Timeout. Use special testing IP address.
c, err = dialBackOffHelper(context.Background(), "tcp4", "198.18.0.254:8888", bo, sleepTO)
if err == nil {
c.Close()
t.Error("Unexpected success dialing")
}
if err != errDialTimeout {
t.Errorf("Error = %v, want: %v", err, errDialTimeout)
}
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}))
defer s.Close()
c, err = dialWithBackOff(context.Background(), "tcp4", strings.TrimPrefix(s.URL, "http://"))
if err != nil {
t.Fatalf("dial error = %v, want nil", err)
}
c.Close()
}

495
vendor/golang.org/x/net/http2/h2c/h2c.go generated vendored Normal file
View File

@ -0,0 +1,495 @@
// Copyright 2018 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package h2c implements the unencrypted "h2c" form of HTTP/2.
//
// The h2c protocol is the non-TLS version of HTTP/2 which is not available from
// net/http or golang.org/x/net/http2.
package h2c
import (
"bufio"
"bytes"
"encoding/base64"
"encoding/binary"
"errors"
"fmt"
"io"
"log"
"net"
"net/http"
"net/textproto"
"os"
"strings"
"golang.org/x/net/http/httpguts"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
)
var (
http2VerboseLogs bool
)
func init() {
e := os.Getenv("GODEBUG")
if strings.Contains(e, "http2debug=1") || strings.Contains(e, "http2debug=2") {
http2VerboseLogs = true
}
}
// h2cHandler is a Handler which implements h2c by hijacking the HTTP/1 traffic
// that should be h2c traffic. There are two ways to begin a h2c connection
// (RFC 7540 Section 3.2 and 3.4): (1) Starting with Prior Knowledge - this
// works by starting an h2c connection with a string of bytes that is valid
// HTTP/1, but unlikely to occur in practice and (2) Upgrading from HTTP/1 to
// h2c - this works by using the HTTP/1 Upgrade header to request an upgrade to
// h2c. When either of those situations occur we hijack the HTTP/1 connection,
// convert it to a HTTP/2 connection and pass the net.Conn to http2.ServeConn.
type h2cHandler struct {
Handler http.Handler
s *http2.Server
}
// NewHandler returns an http.Handler that wraps h, intercepting any h2c
// traffic. If a request is an h2c connection, it's hijacked and redirected to
// s.ServeConn. Otherwise the returned Handler just forwards requests to h. This
// works because h2c is designed to be parseable as valid HTTP/1, but ignored by
// any HTTP server that does not handle h2c. Therefore we leverage the HTTP/1
// compatible parts of the Go http library to parse and recognize h2c requests.
// Once a request is recognized as h2c, we hijack the connection and convert it
// to an HTTP/2 connection which is understandable to s.ServeConn. (s.ServeConn
// understands HTTP/2 except for the h2c part of it.)
func NewHandler(h http.Handler, s *http2.Server) http.Handler {
return &h2cHandler{
Handler: h,
s: s,
}
}
// ServeHTTP implement the h2c support that is enabled by h2c.GetH2CHandler.
func (s h2cHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Handle h2c with prior knowledge (RFC 7540 Section 3.4)
if r.Method == "PRI" && len(r.Header) == 0 && r.URL.Path == "*" && r.Proto == "HTTP/2.0" {
if http2VerboseLogs {
log.Print("h2c: attempting h2c with prior knowledge.")
}
conn, err := initH2CWithPriorKnowledge(w)
if err != nil {
if http2VerboseLogs {
log.Printf("h2c: error h2c with prior knowledge: %v", err)
}
return
}
defer conn.Close()
s.s.ServeConn(conn, &http2.ServeConnOpts{Handler: s.Handler})
return
}
// Handle Upgrade to h2c (RFC 7540 Section 3.2)
if conn, err := h2cUpgrade(w, r); err == nil {
defer conn.Close()
s.s.ServeConn(conn, &http2.ServeConnOpts{Handler: s.Handler})
return
}
s.Handler.ServeHTTP(w, r)
return
}
// initH2CWithPriorKnowledge implements creating a h2c connection with prior
// knowledge (Section 3.4) and creates a net.Conn suitable for http2.ServeConn.
// All we have to do is look for the client preface that is suppose to be part
// of the body, and reforward the client preface on the net.Conn this function
// creates.
func initH2CWithPriorKnowledge(w http.ResponseWriter) (net.Conn, error) {
hijacker, ok := w.(http.Hijacker)
if !ok {
panic("Hijack not supported.")
}
conn, rw, err := hijacker.Hijack()
if err != nil {
panic(fmt.Sprintf("Hijack failed: %v", err))
}
const expectedBody = "SM\r\n\r\n"
buf := make([]byte, len(expectedBody))
n, err := io.ReadFull(rw, buf)
if err != nil {
return nil, fmt.Errorf("could not read from the buffer: %s", err)
}
if string(buf[:n]) == expectedBody {
c := &rwConn{
Conn: conn,
Reader: io.MultiReader(strings.NewReader(http2.ClientPreface), rw),
BufWriter: rw.Writer,
}
return c, nil
}
conn.Close()
if http2VerboseLogs {
log.Printf(
"h2c: missing the request body portion of the client preface. Wanted: %v Got: %v",
[]byte(expectedBody),
buf[0:n],
)
}
return nil, errors.New("invalid client preface")
}
// drainClientPreface reads a single instance of the HTTP/2 client preface from
// the supplied reader.
func drainClientPreface(r io.Reader) error {
var buf bytes.Buffer
prefaceLen := int64(len(http2.ClientPreface))
n, err := io.CopyN(&buf, r, prefaceLen)
if err != nil {
return err
}
if n != prefaceLen || buf.String() != http2.ClientPreface {
return fmt.Errorf("Client never sent: %s", http2.ClientPreface)
}
return nil
}
// h2cUpgrade establishes a h2c connection using the HTTP/1 upgrade (Section 3.2).
func h2cUpgrade(w http.ResponseWriter, r *http.Request) (net.Conn, error) {
if !isH2CUpgrade(r.Header) {
return nil, errors.New("non-conforming h2c headers")
}
// Initial bytes we put into conn to fool http2 server
initBytes, _, err := convertH1ReqToH2(r)
if err != nil {
return nil, err
}
hijacker, ok := w.(http.Hijacker)
if !ok {
return nil, errors.New("hijack not supported.")
}
conn, rw, err := hijacker.Hijack()
if err != nil {
return nil, fmt.Errorf("hijack failed: %v", err)
}
rw.Write([]byte("HTTP/1.1 101 Switching Protocols\r\n" +
"Connection: Upgrade\r\n" +
"Upgrade: h2c\r\n\r\n"))
rw.Flush()
// A conforming client will now send an H2 client preface which need to drain
// since we already sent this.
if err := drainClientPreface(rw); err != nil {
return nil, err
}
c := &rwConn{
Conn: conn,
Reader: io.MultiReader(initBytes, rw),
BufWriter: newSettingsAckSwallowWriter(rw.Writer),
}
return c, nil
}
// convert the data contained in the HTTP/1 upgrade request into the HTTP/2
// version in byte form.
func convertH1ReqToH2(r *http.Request) (*bytes.Buffer, []http2.Setting, error) {
h2Bytes := bytes.NewBuffer([]byte((http2.ClientPreface)))
framer := http2.NewFramer(h2Bytes, nil)
settings, err := getH2Settings(r.Header)
if err != nil {
return nil, nil, err
}
if err := framer.WriteSettings(settings...); err != nil {
return nil, nil, err
}
headerBytes, err := getH2HeaderBytes(r, getMaxHeaderTableSize(settings))
if err != nil {
return nil, nil, err
}
maxFrameSize := int(getMaxFrameSize(settings))
needOneHeader := len(headerBytes) < maxFrameSize
err = framer.WriteHeaders(http2.HeadersFrameParam{
StreamID: 1,
BlockFragment: headerBytes,
EndHeaders: needOneHeader,
})
if err != nil {
return nil, nil, err
}
for i := maxFrameSize; i < len(headerBytes); i += maxFrameSize {
if len(headerBytes)-i > maxFrameSize {
if err := framer.WriteContinuation(1,
false, // endHeaders
headerBytes[i:maxFrameSize]); err != nil {
return nil, nil, err
}
} else {
if err := framer.WriteContinuation(1,
true, // endHeaders
headerBytes[i:]); err != nil {
return nil, nil, err
}
}
}
return h2Bytes, settings, nil
}
// getMaxFrameSize returns the SETTINGS_MAX_FRAME_SIZE. If not present default
// value is 16384 as specified by RFC 7540 Section 6.5.2.
func getMaxFrameSize(settings []http2.Setting) uint32 {
for _, setting := range settings {
if setting.ID == http2.SettingMaxFrameSize {
return setting.Val
}
}
return 16384
}
// getMaxHeaderTableSize returns the SETTINGS_HEADER_TABLE_SIZE. If not present
// default value is 4096 as specified by RFC 7540 Section 6.5.2.
func getMaxHeaderTableSize(settings []http2.Setting) uint32 {
for _, setting := range settings {
if setting.ID == http2.SettingHeaderTableSize {
return setting.Val
}
}
return 4096
}
// bufWriter is a Writer interface that also has a Flush method.
type bufWriter interface {
io.Writer
Flush() error
}
// rwConn implements net.Conn but overrides Read and Write so that reads and
// writes are forwarded to the provided io.Reader and bufWriter.
type rwConn struct {
net.Conn
io.Reader
BufWriter bufWriter
}
// Read forwards reads to the underlying Reader.
func (c *rwConn) Read(p []byte) (int, error) {
return c.Reader.Read(p)
}
// Write forwards writes to the underlying bufWriter and immediately flushes.
func (c *rwConn) Write(p []byte) (int, error) {
n, err := c.BufWriter.Write(p)
if err := c.BufWriter.Flush(); err != nil {
return 0, err
}
return n, err
}
// settingsAckSwallowWriter is a writer that normally forwards bytes to its
// underlying Writer, but swallows the first SettingsAck frame that it sees.
type settingsAckSwallowWriter struct {
Writer *bufio.Writer
buf []byte
didSwallow bool
}
// newSettingsAckSwallowWriter returns a new settingsAckSwallowWriter.
func newSettingsAckSwallowWriter(w *bufio.Writer) *settingsAckSwallowWriter {
return &settingsAckSwallowWriter{
Writer: w,
buf: make([]byte, 0),
didSwallow: false,
}
}
// Write implements io.Writer interface. Normally forwards bytes to w.Writer,
// except for the first Settings ACK frame that it sees.
func (w *settingsAckSwallowWriter) Write(p []byte) (int, error) {
if !w.didSwallow {
w.buf = append(w.buf, p...)
// Process all the frames we have collected into w.buf
for {
// Append until we get full frame header which is 9 bytes
if len(w.buf) < 9 {
break
}
// Check if we have collected a whole frame.
fh, err := http2.ReadFrameHeader(bytes.NewBuffer(w.buf))
if err != nil {
// Corrupted frame, fail current Write
return 0, err
}
fSize := fh.Length + 9
if uint32(len(w.buf)) < fSize {
// Have not collected whole frame. Stop processing buf, and withold on
// forward bytes to w.Writer until we get the full frame.
break
}
// We have now collected a whole frame.
if fh.Type == http2.FrameSettings && fh.Flags.Has(http2.FlagSettingsAck) {
// If Settings ACK frame, do not forward to underlying writer, remove
// bytes from w.buf, and record that we have swallowed Settings Ack
// frame.
w.didSwallow = true
w.buf = w.buf[fSize:]
continue
}
// Not settings ack frame. Forward bytes to w.Writer.
if _, err := w.Writer.Write(w.buf[:fSize]); err != nil {
// Couldn't forward bytes. Fail current Write.
return 0, err
}
w.buf = w.buf[fSize:]
}
return len(p), nil
}
return w.Writer.Write(p)
}
// Flush calls w.Writer.Flush.
func (w *settingsAckSwallowWriter) Flush() error {
return w.Writer.Flush()
}
// isH2CUpgrade returns true if the header properly request an upgrade to h2c
// as specified by Section 3.2.
func isH2CUpgrade(h http.Header) bool {
return httpguts.HeaderValuesContainsToken(h[textproto.CanonicalMIMEHeaderKey("Upgrade")], "h2c") &&
httpguts.HeaderValuesContainsToken(h[textproto.CanonicalMIMEHeaderKey("Connection")], "HTTP2-Settings")
}
// getH2Settings returns the []http2.Setting that are encoded in the
// HTTP2-Settings header.
func getH2Settings(h http.Header) ([]http2.Setting, error) {
vals, ok := h[textproto.CanonicalMIMEHeaderKey("HTTP2-Settings")]
if !ok {
return nil, errors.New("missing HTTP2-Settings header")
}
if len(vals) != 1 {
return nil, fmt.Errorf("expected 1 HTTP2-Settings. Got: %v", vals)
}
settings, err := decodeSettings(vals[0])
if err != nil {
return nil, fmt.Errorf("Invalid HTTP2-Settings: %q", vals[0])
}
return settings, nil
}
// decodeSettings decodes the base64url header value of the HTTP2-Settings
// header. RFC 7540 Section 3.2.1.
func decodeSettings(headerVal string) ([]http2.Setting, error) {
b, err := base64.RawURLEncoding.DecodeString(headerVal)
if err != nil {
return nil, err
}
if len(b)%6 != 0 {
return nil, err
}
settings := make([]http2.Setting, 0)
for i := 0; i < len(b)/6; i++ {
settings = append(settings, http2.Setting{
ID: http2.SettingID(binary.BigEndian.Uint16(b[i*6 : i*6+2])),
Val: binary.BigEndian.Uint32(b[i*6+2 : i*6+6]),
})
}
return settings, nil
}
// getH2HeaderBytes return the headers in r a []bytes encoded by HPACK.
func getH2HeaderBytes(r *http.Request, maxHeaderTableSize uint32) ([]byte, error) {
headerBytes := bytes.NewBuffer(nil)
hpackEnc := hpack.NewEncoder(headerBytes)
hpackEnc.SetMaxDynamicTableSize(maxHeaderTableSize)
// Section 8.1.2.3
err := hpackEnc.WriteField(hpack.HeaderField{
Name: ":method",
Value: r.Method,
})
if err != nil {
return nil, err
}
err = hpackEnc.WriteField(hpack.HeaderField{
Name: ":scheme",
Value: "http",
})
if err != nil {
return nil, err
}
err = hpackEnc.WriteField(hpack.HeaderField{
Name: ":authority",
Value: r.Host,
})
if err != nil {
return nil, err
}
path := r.URL.Path
if r.URL.RawQuery != "" {
path = strings.Join([]string{path, r.URL.RawQuery}, "?")
}
err = hpackEnc.WriteField(hpack.HeaderField{
Name: ":path",
Value: path,
})
if err != nil {
return nil, err
}
// TODO Implement Section 8.3
for header, values := range r.Header {
// Skip non h2 headers
if isNonH2Header(header) {
continue
}
for _, v := range values {
err := hpackEnc.WriteField(hpack.HeaderField{
Name: strings.ToLower(header),
Value: v,
})
if err != nil {
return nil, err
}
}
}
return headerBytes.Bytes(), nil
}
// Connection specific headers listed in RFC 7540 Section 8.1.2.2 that are not
// suppose to be transferred to HTTP/2. The Http2-Settings header is skipped
// since already use to create the HTTP/2 SETTINGS frame.
var nonH2Headers = []string{
"Connection",
"Keep-Alive",
"Proxy-Connection",
"Transfer-Encoding",
"Upgrade",
"Http2-Settings",
}
// isNonH2Header returns true if header should not be transferred to HTTP/2.
func isNonH2Header(header string) bool {
for _, nonH2h := range nonH2Headers {
if header == nonH2h {
return true
}
}
return false
}