feat: support dump http content in client for debugging (#770)

* feat: support dump http content in client for debugging

Signed-off-by: Jim Ma <majinjing3@gmail.com>
This commit is contained in:
Jim Ma 2021-10-29 16:55:33 +08:00 committed by Gaius
parent 54e79fa1ed
commit 6546115359
No known key found for this signature in database
GPG Key ID: 8B4E5D1290FA2FFB
9 changed files with 131 additions and 44 deletions

View File

@ -181,12 +181,12 @@ build-dfget-man-page:
# Run unittests # Run unittests
test: test:
@go test -gcflags "all=-l" -race -short ${PKG_LIST} @go test -v -gcflags "all=-l" -race -short ${PKG_LIST}
.PHONY: test .PHONY: test
# Run tests with coverage # Run tests with coverage
test-coverage: test-coverage:
@go test -gcflags "all=-l" -race -short ${PKG_LIST} -coverprofile cover.out -covermode=atomic @go test -v -gcflags "all=-l" -race -short ${PKG_LIST} -coverprofile cover.out -covermode=atomic
@cat cover.out >> coverage.txt @cat cover.out >> coverage.txt
.PHONY: test-coverage .PHONY: test-coverage

View File

@ -152,14 +152,15 @@ type DownloadOption struct {
type ProxyOption struct { type ProxyOption struct {
// WARNING: when add more option, please update ProxyOption.unmarshal function // WARNING: when add more option, please update ProxyOption.unmarshal function
ListenOption `mapstructure:",squash" yaml:",inline"` ListenOption `mapstructure:",squash" yaml:",inline"`
BasicAuth *BasicAuth `mapstructure:"basicAuth" yaml:"basicAuth"` BasicAuth *BasicAuth `mapstructure:"basicAuth" yaml:"basicAuth"`
DefaultFilter string `mapstructure:"defaultFilter" yaml:"defaultFilter"` DefaultFilter string `mapstructure:"defaultFilter" yaml:"defaultFilter"`
MaxConcurrency int64 `mapstructure:"maxConcurrency" yaml:"maxConcurrency"` MaxConcurrency int64 `mapstructure:"maxConcurrency" yaml:"maxConcurrency"`
RegistryMirror *RegistryMirror `mapstructure:"registryMirror" yaml:"registryMirror"` RegistryMirror *RegistryMirror `mapstructure:"registryMirror" yaml:"registryMirror"`
WhiteList []*WhiteList `mapstructure:"whiteList" yaml:"whiteList"` WhiteList []*WhiteList `mapstructure:"whiteList" yaml:"whiteList"`
Proxies []*Proxy `mapstructure:"proxies" yaml:"proxies"` Proxies []*Proxy `mapstructure:"proxies" yaml:"proxies"`
HijackHTTPS *HijackConfig `mapstructure:"hijackHTTPS" yaml:"hijackHTTPS"` HijackHTTPS *HijackConfig `mapstructure:"hijackHTTPS" yaml:"hijackHTTPS"`
DumpHTTPContent bool `mapstructure:"dumpHTTPContent" yaml:"dumpHTTPContent"`
} }
func (p *ProxyOption) UnmarshalJSON(b []byte) error { func (p *ProxyOption) UnmarshalJSON(b []byte) error {
@ -236,14 +237,15 @@ func (p *ProxyOption) UnmarshalYAML(node *yaml.Node) error {
func (p *ProxyOption) unmarshal(unmarshal func(in []byte, out interface{}) (err error), b []byte) error { func (p *ProxyOption) unmarshal(unmarshal func(in []byte, out interface{}) (err error), b []byte) error {
pt := struct { pt := struct {
ListenOption `mapstructure:",squash" yaml:",inline"` ListenOption `mapstructure:",squash" yaml:",inline"`
BasicAuth *BasicAuth `mapstructure:"basicAuth" yaml:"basicAuth"` BasicAuth *BasicAuth `mapstructure:"basicAuth" yaml:"basicAuth"`
DefaultFilter string `mapstructure:"defaultFilter" yaml:"defaultFilter"` DefaultFilter string `mapstructure:"defaultFilter" yaml:"defaultFilter"`
MaxConcurrency int64 `mapstructure:"maxConcurrency" yaml:"maxConcurrency"` MaxConcurrency int64 `mapstructure:"maxConcurrency" yaml:"maxConcurrency"`
RegistryMirror *RegistryMirror `mapstructure:"registryMirror" yaml:"registryMirror"` RegistryMirror *RegistryMirror `mapstructure:"registryMirror" yaml:"registryMirror"`
WhiteList []*WhiteList `mapstructure:"whiteList" yaml:"whiteList"` WhiteList []*WhiteList `mapstructure:"whiteList" yaml:"whiteList"`
Proxies []*Proxy `mapstructure:"proxies" yaml:"proxies"` Proxies []*Proxy `mapstructure:"proxies" yaml:"proxies"`
HijackHTTPS *HijackConfig `mapstructure:"hijackHTTPS" yaml:"hijackHTTPS"` HijackHTTPS *HijackConfig `mapstructure:"hijackHTTPS" yaml:"hijackHTTPS"`
DumpHTTPContent bool `mapstructure:"dumpHTTPContent" yaml:"dumpHTTPContent"`
}{} }{}
if err := unmarshal(b, &pt); err != nil { if err := unmarshal(b, &pt); err != nil {
@ -258,6 +260,7 @@ func (p *ProxyOption) unmarshal(unmarshal func(in []byte, out interface{}) (err
p.MaxConcurrency = pt.MaxConcurrency p.MaxConcurrency = pt.MaxConcurrency
p.DefaultFilter = pt.DefaultFilter p.DefaultFilter = pt.DefaultFilter
p.BasicAuth = pt.BasicAuth p.BasicAuth = pt.BasicAuth
p.DumpHTTPContent = pt.DumpHTTPContent
return nil return nil
} }

View File

@ -70,7 +70,7 @@ type Proxy struct {
// cert is the certificate used to hijack https proxy requests // cert is the certificate used to hijack https proxy requests
cert *tls.Certificate cert *tls.Certificate
// certCache is a in-memory cache store for TLS certs used in HTTPS hijack. Lazy init. // certCache is an in-memory cache store for TLS certs used in HTTPS hijack. Lazy init.
certCache *lru.Cache certCache *lru.Cache
// directHandler are used to handle non-proxy requests // directHandler are used to handle non-proxy requests
@ -95,6 +95,9 @@ type Proxy struct {
tracer trace.Tracer tracer trace.Tracer
basicAuth *config.BasicAuth basicAuth *config.BasicAuth
// dumpHTTPContent indicates to dump http request header and response header
dumpHTTPContent bool
} }
// Option is a functional option for configuring the proxy // Option is a functional option for configuring the proxy
@ -201,6 +204,13 @@ func WithBasicAuth(auth *config.BasicAuth) Option {
} }
} }
func WithDumpHTTPContent(dump bool) Option {
return func(p *Proxy) *Proxy {
p.dumpHTTPContent = dump
return p
}
}
// NewProxy returns a new transparent proxy from the given options // NewProxy returns a new transparent proxy from the given options
func NewProxy(options ...Option) (*Proxy, error) { func NewProxy(options ...Option) (*Proxy, error) {
return NewProxyWithOptions(options...) return NewProxyWithOptions(options...)
@ -334,12 +344,14 @@ func (proxy *Proxy) handleHTTP(span trace.Span, w http.ResponseWriter, req *http
func (proxy *Proxy) handleHTTPS(w http.ResponseWriter, r *http.Request) { func (proxy *Proxy) handleHTTPS(w http.ResponseWriter, r *http.Request) {
if proxy.cert == nil { if proxy.cert == nil {
logger.Debugf("proxy cert is not configured, tunneling https request for %s", r.Host)
tunnelHTTPS(w, r) tunnelHTTPS(w, r)
return return
} }
cConfig := proxy.remoteConfig(r.Host) cConfig := proxy.remoteConfig(r.Host)
if cConfig == nil { if cConfig == nil {
logger.Debugf("hijackHTTPS hosts not match, tunneling https request for %s", r.Host)
tunnelHTTPS(w, r) tunnelHTTPS(w, r)
return return
} }
@ -394,9 +406,16 @@ func (proxy *Proxy) handleHTTPS(w http.ResponseWriter, r *http.Request) {
cConn.Close() cConn.Close()
rp := &httputil.ReverseProxy{ rp := &httputil.ReverseProxy{
Director: func(r *http.Request) { Director: func(req *http.Request) {
r.URL.Host = r.Host req.URL.Host = req.Host
r.URL.Scheme = schemaHTTPS req.URL.Scheme = schemaHTTPS
if proxy.dumpHTTPContent {
if out, e := httputil.DumpRequest(req, false); e == nil {
logger.Debugf("dump request in ReverseProxy: %s", string(out))
} else {
logger.Errorf("dump request in ReverseProxy error: %s", e)
}
}
}, },
Transport: proxy.newTransport(cConfig), Transport: proxy.newTransport(cConfig),
} }
@ -405,7 +424,8 @@ func (proxy *Proxy) handleHTTPS(w http.ResponseWriter, r *http.Request) {
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
wg.Add(1) wg.Add(1)
// NOTE: http.Serve always returns a non-nil error // NOTE: http.Serve always returns a non-nil error
if err := http.Serve(&singleUseListener{&customCloseConn{sConn, wg.Done}}, rp); err != errServerClosed && err != http.ErrServerClosed { err = http.Serve(&singleUseListener{&customCloseConn{sConn, wg.Done}}, rp)
if err != errServerClosed && err != http.ErrServerClosed {
logger.Errorf("failed to accept incoming HTTP connections: %v", err) logger.Errorf("failed to accept incoming HTTP connections: %v", err)
} }
wg.Wait() wg.Wait()
@ -419,6 +439,7 @@ func (proxy *Proxy) newTransport(tlsConfig *tls.Config) http.RoundTripper {
transport.WithCondition(proxy.shouldUseDragonfly), transport.WithCondition(proxy.shouldUseDragonfly),
transport.WithDefaultFilter(proxy.defaultFilter), transport.WithDefaultFilter(proxy.defaultFilter),
transport.WithDefaultBiz(bizTag), transport.WithDefaultBiz(bizTag),
transport.WithDumpHTTPContent(proxy.dumpHTTPContent),
) )
return rt return rt
} }
@ -432,6 +453,7 @@ func (proxy *Proxy) mirrorRegistry(w http.ResponseWriter, r *http.Request) {
transport.WithCondition(proxy.shouldUseDragonflyForMirror), transport.WithCondition(proxy.shouldUseDragonflyForMirror),
transport.WithDefaultFilter(proxy.defaultFilter), transport.WithDefaultFilter(proxy.defaultFilter),
transport.WithDefaultBiz(bizTag), transport.WithDefaultBiz(bizTag),
transport.WithDumpHTTPContent(proxy.dumpHTTPContent),
) )
if err != nil { if err != nil {
http.Error(w, fmt.Sprintf("failed to get transport: %v", err), http.StatusInternalServerError) http.Error(w, fmt.Sprintf("failed to get transport: %v", err), http.StatusInternalServerError)
@ -529,7 +551,6 @@ func (proxy *Proxy) shouldUseDragonflyForMirror(req *http.Request) bool {
// tunnelHTTPS handles a CONNECT request and proxy an https request through an // tunnelHTTPS handles a CONNECT request and proxy an https request through an
// http tunnel. // http tunnel.
func tunnelHTTPS(w http.ResponseWriter, r *http.Request) { func tunnelHTTPS(w http.ResponseWriter, r *http.Request) {
logger.Debugf("Tunneling https request for %s", r.Host)
dst, err := net.DialTimeout("tcp", r.Host, 10*time.Second) dst, err := net.DialTimeout("tcp", r.Host, 10*time.Second)
if err != nil { if err != nil {
http.Error(w, err.Error(), http.StatusServiceUnavailable) http.Error(w, err.Error(), http.StatusServiceUnavailable)

View File

@ -70,6 +70,7 @@ func NewProxyManager(peerHost *scheduler.PeerHost, peerTaskManager peer.TaskMana
WithMaxConcurrency(opts.MaxConcurrency), WithMaxConcurrency(opts.MaxConcurrency),
WithDefaultFilter(opts.DefaultFilter), WithDefaultFilter(opts.DefaultFilter),
WithBasicAuth(opts.BasicAuth), WithBasicAuth(opts.BasicAuth),
WithDumpHTTPContent(opts.DumpHTTPContent),
} }
if registry != nil { if registry != nil {

View File

@ -108,11 +108,18 @@ func (proxy *Proxy) handleTLSConn(clientConn net.Conn, port int) {
defer tlsConn.Close() defer tlsConn.Close()
rp := &httputil.ReverseProxy{ rp := &httputil.ReverseProxy{
Director: func(r *http.Request) { Director: func(req *http.Request) {
r.URL.Scheme = schemaHTTPS req.URL.Scheme = schemaHTTPS
r.URL.Host = serverName req.URL.Host = serverName
if port != portHTTPS { if port != portHTTPS {
r.URL.Host = fmt.Sprintf("%s:%d", serverName, port) req.URL.Host = fmt.Sprintf("%s:%d", serverName, port)
}
if proxy.dumpHTTPContent {
if out, e := httputil.DumpRequest(req, false); e == nil {
logger.Debugf("dump request in SNI ReverseProxy: %s", string(out))
} else {
logger.Errorf("dump request in SNI ReverseProxy error: %s", e)
}
} }
}, },
Transport: proxy.newTransport(proxy.remoteConfig(serverName)), Transport: proxy.newTransport(proxy.remoteConfig(serverName)),
@ -122,7 +129,8 @@ func (proxy *Proxy) handleTLSConn(clientConn net.Conn, port int) {
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
wg.Add(1) wg.Add(1)
// NOTE: http.Serve always returns a non-nil error // NOTE: http.Serve always returns a non-nil error
if err := http.Serve(&singleUseListener{&customCloseConn{tlsConn, wg.Done}}, rp); err != errServerClosed && err != http.ErrServerClosed { err = http.Serve(&singleUseListener{&customCloseConn{tlsConn, wg.Done}}, rp)
if err != errServerClosed && err != http.ErrServerClosed {
logger.Errorf("failed to accept incoming HTTPS connections: %v", err) logger.Errorf("failed to accept incoming HTTPS connections: %v", err)
} }
wg.Wait() wg.Wait()

View File

@ -26,12 +26,13 @@ import (
"sync" "sync"
"time" "time"
"go.uber.org/atomic"
"d7y.io/dragonfly/v2/client/clientutil" "d7y.io/dragonfly/v2/client/clientutil"
"d7y.io/dragonfly/v2/internal/dferrors" "d7y.io/dragonfly/v2/internal/dferrors"
logger "d7y.io/dragonfly/v2/internal/dflog" logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/rpc/base" "d7y.io/dragonfly/v2/pkg/rpc/base"
"d7y.io/dragonfly/v2/pkg/util/digestutils" "d7y.io/dragonfly/v2/pkg/util/digestutils"
"go.uber.org/atomic"
) )
type localTaskStore struct { type localTaskStore struct {
@ -342,19 +343,19 @@ func (t *localTaskStore) reclaimData(sLogger *logger.SugaredLoggerOnWith) error
if stat.Mode()&os.ModeSymlink == os.ModeSymlink { if stat.Mode()&os.ModeSymlink == os.ModeSymlink {
dest, err0 := os.Readlink(data) dest, err0 := os.Readlink(data)
if err0 == nil { if err0 == nil {
if err = os.Remove(dest); err != nil { if err = os.Remove(dest); err != nil && !os.IsNotExist(err) {
sLogger.Warnf("remove symlink target file %s error: %s", dest, err) sLogger.Warnf("remove symlink target file %s error: %s", dest, err)
} else { } else {
sLogger.Infof("remove data file %s", dest) sLogger.Infof("remove data file %s", dest)
} }
} }
} else { // remove cache file } else { // remove cache file
if err = os.Remove(t.DataFilePath); err != nil { if err = os.Remove(t.DataFilePath); err != nil && !os.IsNotExist(err) {
sLogger.Errorf("remove data file %s error: %s", data, err) sLogger.Errorf("remove data file %s error: %s", data, err)
return err return err
} }
} }
if err = os.Remove(data); err != nil { if err = os.Remove(data); err != nil && !os.IsNotExist(err) {
sLogger.Errorf("remove data file %s error: %s", data, err) sLogger.Errorf("remove data file %s error: %s", data, err)
return err return err
} }

View File

@ -21,9 +21,13 @@ import (
"fmt" "fmt"
"net" "net"
"net/http" "net/http"
"net/http/httputil"
"regexp" "regexp"
"strconv"
"time" "time"
"github.com/go-http-utils/headers"
"d7y.io/dragonfly/v2/client/clientutil" "d7y.io/dragonfly/v2/client/clientutil"
"d7y.io/dragonfly/v2/client/config" "d7y.io/dragonfly/v2/client/config"
"d7y.io/dragonfly/v2/client/daemon/peer" "d7y.io/dragonfly/v2/client/daemon/peer"
@ -61,6 +65,9 @@ type transport struct {
// defaultBiz is used when http request without X-Dragonfly-Biz Header // defaultBiz is used when http request without X-Dragonfly-Biz Header
defaultBiz string defaultBiz string
// dumpHTTPContent indicates to dump http request header and response header
dumpHTTPContent bool
} }
// Option is functional config for transport. // Option is functional config for transport.
@ -114,6 +121,13 @@ func WithDefaultBiz(b string) Option {
} }
} }
func WithDumpHTTPContent(b bool) Option {
return func(rt *transport) *transport {
rt.dumpHTTPContent = b
return rt
}
}
// New constructs a new instance of a RoundTripper with additional options. // New constructs a new instance of a RoundTripper with additional options.
func New(options ...Option) (http.RoundTripper, error) { func New(options ...Option) (http.RoundTripper, error) {
rt := &transport{ rt := &transport{
@ -130,19 +144,25 @@ func New(options ...Option) (http.RoundTripper, error) {
// RoundTrip only process first redirect at present // RoundTrip only process first redirect at present
// fix resource release // fix resource release
func (rt *transport) RoundTrip(req *http.Request) (*http.Response, error) { func (rt *transport) RoundTrip(req *http.Request) (resp *http.Response, err error) {
if rt.shouldUseDragonfly(req) { if rt.shouldUseDragonfly(req) {
// delete the Accept-Encoding header to avoid returning the same cached // delete the Accept-Encoding header to avoid returning the same cached
// result for different requests // result for different requests
req.Header.Del("Accept-Encoding") req.Header.Del("Accept-Encoding")
logger.Debugf("round trip with dragonfly: %s", req.URL.String()) logger.Debugf("round trip with dragonfly: %s", req.URL.String())
return rt.download(req) resp, err = rt.download(req)
} else {
logger.Debugf("round trip directly, method: %s, url: %s", req.Method, req.URL.String())
req.Host = req.URL.Host
req.Header.Set("Host", req.Host)
resp, err = rt.baseRoundTripper.RoundTrip(req)
} }
logger.Debugf("round trip directly: %s %s", req.Method, req.URL.String()) if err != nil {
req.Host = req.URL.Host logger.With("method", req.Method, "url", req.URL.String()).
req.Header.Set("Host", req.Host) Errorf("round trip error: %s", err)
}
return rt.baseRoundTripper.RoundTrip(req) rt.processDumpHTTPContent(req, resp)
return resp, err
} }
// NeedUseDragonfly is the default value for shouldUseDragonfly, which downloads all // NeedUseDragonfly is the default value for shouldUseDragonfly, which downloads all
@ -202,14 +222,45 @@ func (rt *transport) download(req *http.Request) (*http.Response, error) {
hdr := httputils.MapToHeader(attr) hdr := httputils.MapToHeader(attr)
log.Infof("download stream attribute: %v", hdr) log.Infof("download stream attribute: %v", hdr)
var contentLength int64 = -1
if l, ok := attr[headers.ContentLength]; ok {
if i, e := strconv.ParseInt(l, 10, 64); e == nil {
contentLength = i
}
}
resp := &http.Response{ resp := &http.Response{
StatusCode: 200, StatusCode: http.StatusOK,
Body: body, Body: body,
Header: hdr, Header: hdr,
ContentLength: contentLength,
Proto: req.Proto,
ProtoMajor: req.ProtoMajor,
ProtoMinor: req.ProtoMinor,
} }
return resp, nil return resp, nil
} }
func (rt *transport) processDumpHTTPContent(req *http.Request, resp *http.Response) {
if !rt.dumpHTTPContent {
return
}
if out, e := httputil.DumpRequest(req, false); e == nil {
logger.Debugf("dump request in transport: %s", string(out))
} else {
logger.Errorf("dump request in transport error: %s", e)
}
if resp == nil {
return
}
if out, e := httputil.DumpResponse(resp, false); e == nil {
logger.Debugf("dump response in transport: %s", string(out))
} else {
logger.Errorf("dump response in transport error: %s", e)
}
}
func defaultHTTPTransport(cfg *tls.Config) *http.Transport { func defaultHTTPTransport(cfg *tls.Config) *http.Transport {
if cfg == nil { if cfg == nil {
cfg = &tls.Config{InsecureSkipVerify: true} cfg = &tls.Config{InsecureSkipVerify: true}

@ -1 +1 @@
Subproject commit 3a576ac0752d4f956bb2feeb62180a703050b43d Subproject commit e3d9e72f64bb2328edeb574872d873d1336349b1

View File

@ -33,6 +33,8 @@ dfdaemon:
hostPath: hostPath:
path: /tmp/artifact path: /tmp/artifact
config: config:
scheduler:
disableAutoBackSource: true
proxy: proxy:
defaultFilter: "Expires&Signature" defaultFilter: "Expires&Signature"
security: security: