feat: custom default HTTP transport (#711)

* src: dialer pod auto-exit

The pod should exit (be Completed) when func exits.

Signed-off-by: Matej Vasek <mvasek@redhat.com>

* feat: use in cluster dialer by default

This sets http.DefaultTransport to our specialized RoundTripper.
The RoundTripper first tries dial in standard way.
If dial operation fails due to hostname resolution error
in cluster dialer will be used.

Signed-off-by: Matej Vasek <mvasek@redhat.com>
This commit is contained in:
Matej Vasek 2021-12-10 11:31:29 +01:00 committed by GitHub
parent 49883545c8
commit a13f897fbb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 161 additions and 11 deletions

View File

@ -2,10 +2,13 @@ package main
import (
"context"
"net/http"
"os"
"os/signal"
"syscall"
funcHttp "knative.dev/kn-plugin-func/http"
"knative.dev/kn-plugin-func/cmd"
)
@ -14,6 +17,11 @@ import (
var date, vers, hash string
func main() {
rt := funcHttp.NewRoundTripper()
http.DefaultTransport = rt
defer rt.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

92
http/transport.go Normal file
View File

@ -0,0 +1,92 @@
package http
import (
"context"
"errors"
"io"
"net"
"net/http"
"sync"
"time"
"knative.dev/kn-plugin-func/k8s"
)
type RoundTripCloser interface {
http.RoundTripper
io.Closer
}
// NewRoundTripper returns new closable RoundTripper that first tries to dial connection in standard way,
// if the dial operation fails due to hostname resolution the RoundTripper tries to dial from in cluster pod.
//
// This is useful for accessing cluster internal services (pushing a CloudEvent into Knative broker).
func NewRoundTripper() RoundTripCloser {
d := &dialer{
netDialer: net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
},
}
return &roundTripCloser{
Transport: http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: d.DialContext,
ForceAttemptHTTP2: true,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},
d: d,
}
}
type roundTripCloser struct {
http.Transport
d *dialer
}
func (r *roundTripCloser) Close() error {
return r.d.Close()
}
type dialer struct {
o sync.Once
netDialer net.Dialer
inClusterDialer k8s.ContextDialer
}
func (d *dialer) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
conn, err := d.netDialer.DialContext(ctx, network, address)
if err == nil {
return conn, nil
}
var dnsErr *net.DNSError
if !(errors.As(err, &dnsErr) && dnsErr.IsNotFound) {
return nil, err
}
err = nil
d.o.Do(func() {
d.inClusterDialer, err = k8s.NewInClusterDialer(ctx)
})
if err != nil {
return nil, err
}
if d.inClusterDialer == nil {
return nil, errors.New("failed to init in cluster dialer")
}
return d.inClusterDialer.DialContext(ctx, network, address)
}
func (d *dialer) Close() error {
if d.inClusterDialer != nil {
return d.inClusterDialer.Close()
}
return nil
}

View File

@ -49,7 +49,9 @@ type ContextDialer interface {
// Transport: transport,
// }
func NewInClusterDialer(ctx context.Context) (ContextDialer, error) {
c := &contextDialer{}
c := &contextDialer{
detachChan: make(chan struct{}),
}
err := c.startDialerPod(ctx)
if err != nil {
return nil, err
@ -58,10 +60,11 @@ func NewInClusterDialer(ctx context.Context) (ContextDialer, error) {
}
type contextDialer struct {
coreV1 v1.CoreV1Interface
restConf *restclient.Config
podName string
namespace string
coreV1 v1.CoreV1Interface
restConf *restclient.Config
podName string
namespace string
detachChan chan struct{}
}
func (c *contextDialer) DialContext(ctx context.Context, network string, addr string) (net.Conn, error) {
@ -87,6 +90,9 @@ func (c *contextDialer) DialContext(ctx context.Context, network string, addr st
}
func (c *contextDialer) Close() error {
// closing the channel will cause stdin of the attached container to return EOF
// as a result the pod exits -- it transits to Completed state
close(c.detachChan)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*1)
defer cancel()
delOpts := metaV1.DeleteOptions{}
@ -136,10 +142,11 @@ func (c *contextDialer) startDialerPod(ctx context.Context) (err error) {
Spec: coreV1.PodSpec{
Containers: []coreV1.Container{
{
Name: c.podName,
Image: socatImage,
Stdin: true,
Command: []string{"sleep", "infinity"},
Name: c.podName,
Image: socatImage,
Stdin: true,
StdinOnce: true,
Args: []string{"-u", "-", "OPEN:/dev/null,append"},
},
},
DNSPolicy: coreV1.DNSClusterFirst,
@ -164,10 +171,24 @@ func (c *contextDialer) startDialerPod(ctx context.Context) (err error) {
}
if err != nil {
err = fmt.Errorf("failed to start dialer container: %w", err)
return fmt.Errorf("failed to start dialer container: %w", err)
}
return err
// attaching to the stdin to automatically Complete the pod on exit
go func() {
_ = c.attach(emptyBlockingReader(c.detachChan), io.Discard, io.Discard)
}()
return nil
}
// reader that returns no data and blocks until
// the channel is closed or data are sent to the channel
type emptyBlockingReader chan struct{}
func (e emptyBlockingReader) Read(p []byte) (n int, err error) {
<-e
return 0, io.EOF
}
func (c *contextDialer) exec(hostPort string, in io.Reader, out, errOut io.Writer) error {
@ -200,6 +221,35 @@ func (c *contextDialer) exec(hostPort string, in io.Reader, out, errOut io.Write
})
}
func (c *contextDialer) attach(in io.Reader, out, errOut io.Writer) error {
restClient := c.coreV1.RESTClient()
req := restClient.Post().
Resource("pods").
Name(c.podName).
Namespace(c.namespace).
SubResource("attach")
req.VersionedParams(&coreV1.PodAttachOptions{
Container: c.podName,
Stdin: true,
Stdout: true,
Stderr: true,
TTY: false,
}, scheme.ParameterCodec)
executor, err := remotecommand.NewSPDYExecutor(c.restConf, "POST", req.URL())
if err != nil {
return err
}
return executor.Stream(remotecommand.StreamOptions{
Stdin: in,
Stdout: out,
Stderr: errOut,
Tty: false,
})
}
func (c *contextDialer) podReady(ctx context.Context) (errChan <-chan error) {
d := make(chan error)
errChan = d